YARN-4455. Support fetching metrics by time range. Contributed by Varun Saxena.

(cherry picked from commit 743e4731781a7d9a4a5f1f09adc510f193182158)
This commit is contained in:
Rohith Sharma K S 2017-07-20 12:16:06 +05:30 committed by Varun Saxena
parent d01a3f1ba2
commit 484d7e9b39
13 changed files with 777 additions and 176 deletions

View File

@ -328,7 +328,7 @@ private static void loadData() throws Exception {
userEntity.setType("entitytype"); userEntity.setType("entitytype");
userEntity.setId("entityid-" + i); userEntity.setId("entityid-" + i);
userEntity.setIdPrefix(11 - i); userEntity.setIdPrefix(11 - i);
userEntity.setCreatedTime(System.currentTimeMillis()); userEntity.setCreatedTime(ts);
userEntities.addEntity(userEntity); userEntities.addEntity(userEntity);
} }
@ -344,7 +344,7 @@ private static void loadData() throws Exception {
flowVersion2, runid2, entity3.getId(), te3); flowVersion2, runid2, entity3.getId(), te3);
hbi.write(cluster, user, flow, flowVersion, runid, hbi.write(cluster, user, flow, flowVersion, runid,
"application_1111111111_1111", userEntities); "application_1111111111_1111", userEntities);
writeApplicationEntities(hbi); writeApplicationEntities(hbi, ts);
hbi.flush(); hbi.flush();
} finally { } finally {
if (hbi != null) { if (hbi != null) {
@ -353,26 +353,25 @@ private static void loadData() throws Exception {
} }
} }
static void writeApplicationEntities(HBaseTimelineWriterImpl hbi) static void writeApplicationEntities(HBaseTimelineWriterImpl hbi,
throws IOException { long timestamp) throws IOException {
long currentTimeMillis = System.currentTimeMillis();
int count = 1; int count = 1;
for (long i = 1; i <= 3; i++) { for (long i = 1; i <= 3; i++) {
for (int j = 1; j <= 5; j++) { for (int j = 1; j <= 5; j++) {
TimelineEntities te = new TimelineEntities(); TimelineEntities te = new TimelineEntities();
ApplicationId appId = ApplicationId appId =
BuilderUtils.newApplicationId(currentTimeMillis, count++); BuilderUtils.newApplicationId(timestamp, count++);
ApplicationEntity appEntity = new ApplicationEntity(); ApplicationEntity appEntity = new ApplicationEntity();
appEntity.setId(appId.toString()); appEntity.setId(appId.toString());
appEntity.setCreatedTime(currentTimeMillis); appEntity.setCreatedTime(timestamp);
TimelineEvent created = new TimelineEvent(); TimelineEvent created = new TimelineEvent();
created.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); created.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
created.setTimestamp(currentTimeMillis); created.setTimestamp(timestamp);
appEntity.addEvent(created); appEntity.addEvent(created);
TimelineEvent finished = new TimelineEvent(); TimelineEvent finished = new TimelineEvent();
finished.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); finished.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
finished.setTimestamp(currentTimeMillis + i * j); finished.setTimestamp(timestamp + i * j);
appEntity.addEvent(finished); appEntity.addEvent(finished);
te.addEntity(appEntity); te.addEntity(appEntity);
@ -1766,6 +1765,113 @@ public void testGetEntitiesRelationFilters() throws Exception {
} }
} }
private static void verifyMetricCount(TimelineEntity entity,
int expectedMetricsCnt, int expectedMeticsValCnt) {
int metricsValCnt = 0;
for (TimelineMetric m : entity.getMetrics()) {
metricsValCnt += m.getValues().size();
}
assertEquals(expectedMetricsCnt, entity.getMetrics().size());
assertEquals(expectedMeticsValCnt, metricsValCnt);
}
private static void verifyMetricsCount(Set<TimelineEntity> entities,
int expectedMetricsCnt, int expectedMeticsValCnt) {
int metricsCnt = 0;
int metricsValCnt = 0;
for (TimelineEntity entity : entities) {
metricsCnt += entity.getMetrics().size();
for (TimelineMetric m : entity.getMetrics()) {
metricsValCnt += m.getValues().size();
}
}
assertEquals(expectedMetricsCnt, metricsCnt);
assertEquals(expectedMeticsValCnt, metricsValCnt);
}
@Test
public void testGetEntitiesMetricsTimeRange() throws Exception {
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 90000) + "&metricstimeend=" + (ts - 80000));
ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 4, 4);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 100000) + "&metricstimeend=" + (ts - 80000));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 5, 9);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 100000));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 5, 9);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?fields=ALL&metricslimit=100&metricstimeend=" +
(ts - 90000));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 5, 5);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?fields=ALL&metricstimestart=" +
(ts - 100000));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 5, 5);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1/entity2?fields=ALL&metricstimestart=" +
(ts - 100000) + "&metricstimeend=" + (ts - 80000));
resp = getResponse(client, uri);
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
assertNotNull(entity);
verifyMetricCount(entity, 3, 3);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1/entity2?fields=ALL&metricslimit=5&metricstimestart=" +
(ts - 100000) + "&metricstimeend=" + (ts - 80000));
resp = getResponse(client, uri);
entity = resp.getEntity(TimelineEntity.class);
assertNotNull(entity);
verifyMetricCount(entity, 3, 5);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 80000) + "&metricstimeend=" + (ts - 90000));
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
} finally {
client.destroy();
}
}
/** /**
* Tests if specific configs and metrics are retrieve for getEntity call. * Tests if specific configs and metrics are retrieve for getEntity call.
*/ */
@ -2366,4 +2472,87 @@ public void testForFlowRunsPagination() throws Exception {
client.destroy(); client.destroy();
} }
} }
@Test
public void testGetAppsMetricsRange() throws Exception {
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
"1002345678919/apps?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 200000) + "&metricstimeend=" + (ts - 100000));
ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 4, 4);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
"1002345678919/apps?fields=ALL&metricslimit=100");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 4, 10);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/" +
"apps?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 200000) + "&metricstimeend=" + (ts - 100000));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(3, entities.size());
verifyMetricsCount(entities, 5, 5);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/" +
"apps?fields=ALL&metricslimit=100");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(3, entities.size());
verifyMetricsCount(entities, 5, 12);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
"1002345678919/apps?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 200000));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 4, 10);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
"1002345678919/apps?fields=ALL&metricslimit=100&metricstimeend=" +
(ts - 100000));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 4, 4);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/apps/application_1111111111_1111?userid=user1&fields=ALL" +
"&flowname=flow_name&flowrunid=1002345678919&metricslimit=100" +
"&metricstimestart=" +(ts - 200000) + "&metricstimeend=" +
(ts - 100000));
resp = getResponse(client, uri);
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
assertNotNull(entity);
verifyMetricCount(entity, 3, 3);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/" +
"apps?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 100000) + "&metricstimeend=" + (ts - 200000));
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
} finally {
client.destroy();
}
}
} }

View File

@ -58,7 +58,8 @@ public static void createSchema(final Configuration conf)
TimelineSchemaCreator.createAllTables(conf, false); TimelineSchemaCreator.createAllTables(conf, false);
} }
public static void loadApps(HBaseTestingUtility util) throws IOException { public static void loadApps(HBaseTestingUtility util, long ts)
throws IOException {
TimelineEntities te = new TimelineEntities(); TimelineEntities te = new TimelineEntities();
TimelineEntity entity = new TimelineEntity(); TimelineEntity entity = new TimelineEntity();
String id = "application_1111111111_2222"; String id = "application_1111111111_2222";
@ -92,7 +93,6 @@ public static void loadApps(HBaseTestingUtility util) throws IOException {
entity.addConfigs(conf); entity.addConfigs(conf);
// add metrics // add metrics
Set<TimelineMetric> metrics = new HashSet<>(); Set<TimelineMetric> metrics = new HashSet<>();
long ts = System.currentTimeMillis();
metrics.add(getMetric4(ts)); metrics.add(getMetric4(ts));
TimelineMetric m12 = new TimelineMetric(); TimelineMetric m12 = new TimelineMetric();
@ -137,7 +137,7 @@ public static void loadApps(HBaseTestingUtility util) throws IOException {
entity1.addConfigs(conf1); entity1.addConfigs(conf1);
// add metrics // add metrics
entity1.addMetrics(getMetrics4()); entity1.addMetrics(getMetrics4(ts));
TimelineEvent event11 = new TimelineEvent(); TimelineEvent event11 = new TimelineEvent();
event11.setId("end_event"); event11.setId("end_event");
event11.setTimestamp(ts); event11.setTimestamp(ts);
@ -175,18 +175,17 @@ public static void loadApps(HBaseTestingUtility util) throws IOException {
} }
} }
private static Set<TimelineMetric> getMetrics4() { private static Set<TimelineMetric> getMetrics4(long ts) {
Set<TimelineMetric> metrics1 = new HashSet<>(); Set<TimelineMetric> metrics1 = new HashSet<>();
TimelineMetric m2 = new TimelineMetric(); TimelineMetric m2 = new TimelineMetric();
m2.setId("MAP1_SLOT_MILLIS"); m2.setId("MAP1_SLOT_MILLIS");
long ts1 = System.currentTimeMillis();
Map<Long, Number> metricValues1 = new HashMap<>(); Map<Long, Number> metricValues1 = new HashMap<>();
metricValues1.put(ts1 - 120000, 100000000); metricValues1.put(ts - 120000, 100000000);
metricValues1.put(ts1 - 100000, 200000000); metricValues1.put(ts - 100000, 200000000);
metricValues1.put(ts1 - 80000, 300000000); metricValues1.put(ts - 80000, 300000000);
metricValues1.put(ts1 - 60000, 400000000); metricValues1.put(ts - 60000, 400000000);
metricValues1.put(ts1 - 40000, 50000000000L); metricValues1.put(ts - 40000, 50000000000L);
metricValues1.put(ts1 - 20000, 60000000000L); metricValues1.put(ts - 20000, 60000000000L);
m2.setType(Type.TIME_SERIES); m2.setType(Type.TIME_SERIES);
m2.setValues(metricValues1); m2.setValues(metricValues1);
metrics1.add(m2); metrics1.add(m2);
@ -307,7 +306,7 @@ private static Map<Long, Number> getMetricValues1(long ts) {
return metricValues; return metricValues;
} }
public static void loadEntities(HBaseTestingUtility util) public static void loadEntities(HBaseTestingUtility util, long ts)
throws IOException { throws IOException {
TimelineEntities te = new TimelineEntities(); TimelineEntities te = new TimelineEntities();
TimelineEntity entity = new TimelineEntity(); TimelineEntity entity = new TimelineEntity();
@ -332,7 +331,6 @@ public static void loadEntities(HBaseTestingUtility util)
Set<TimelineMetric> metrics = new HashSet<>(); Set<TimelineMetric> metrics = new HashSet<>();
TimelineMetric m1 = new TimelineMetric(); TimelineMetric m1 = new TimelineMetric();
m1.setId("MAP_SLOT_MILLIS"); m1.setId("MAP_SLOT_MILLIS");
long ts = System.currentTimeMillis();
m1.setType(Type.TIME_SERIES); m1.setType(Type.TIME_SERIES);
m1.setValues(getMetricValues1(ts)); m1.setValues(getMetricValues1(ts));
metrics.add(m1); metrics.add(m1);
@ -383,9 +381,8 @@ public static void loadEntities(HBaseTestingUtility util)
Set<TimelineMetric> metrics1 = new HashSet<>(); Set<TimelineMetric> metrics1 = new HashSet<>();
TimelineMetric m2 = new TimelineMetric(); TimelineMetric m2 = new TimelineMetric();
m2.setId("MAP1_SLOT_MILLIS"); m2.setId("MAP1_SLOT_MILLIS");
long ts1 = System.currentTimeMillis();
m2.setType(Type.TIME_SERIES); m2.setType(Type.TIME_SERIES);
m2.setValues(getMetricValues2(ts1)); m2.setValues(getMetricValues2(ts));
metrics1.add(m2); metrics1.add(m2);
entity1.addMetrics(metrics1); entity1.addMetrics(metrics1);
te.addEntity(entity1); te.addEntity(entity1);

View File

@ -86,13 +86,14 @@ public class TestHBaseTimelineStorageApps {
private static HBaseTestingUtility util; private static HBaseTestingUtility util;
private HBaseTimelineReaderImpl reader; private HBaseTimelineReaderImpl reader;
private static final long CURRENT_TIME = System.currentTimeMillis();
@BeforeClass @BeforeClass
public static void setupBeforeClass() throws Exception { public static void setupBeforeClass() throws Exception {
util = new HBaseTestingUtility(); util = new HBaseTestingUtility();
util.startMiniCluster(); util.startMiniCluster();
DataGeneratorForTest.createSchema(util.getConfiguration()); DataGeneratorForTest.createSchema(util.getConfiguration());
DataGeneratorForTest.loadApps(util); DataGeneratorForTest.loadApps(util, CURRENT_TIME);
} }
@Before @Before
@ -235,13 +236,12 @@ public void testWriteApplicationToHBase() throws Exception {
TimelineMetric m1 = new TimelineMetric(); TimelineMetric m1 = new TimelineMetric();
m1.setId("MAP_SLOT_MILLIS"); m1.setId("MAP_SLOT_MILLIS");
Map<Long, Number> metricValues = new HashMap<Long, Number>(); Map<Long, Number> metricValues = new HashMap<Long, Number>();
long ts = System.currentTimeMillis(); metricValues.put(CURRENT_TIME - 120000, 100000000);
metricValues.put(ts - 120000, 100000000); metricValues.put(CURRENT_TIME - 100000, 200000000);
metricValues.put(ts - 100000, 200000000); metricValues.put(CURRENT_TIME - 80000, 300000000);
metricValues.put(ts - 80000, 300000000); metricValues.put(CURRENT_TIME - 60000, 400000000);
metricValues.put(ts - 60000, 400000000); metricValues.put(CURRENT_TIME - 40000, 50000000000L);
metricValues.put(ts - 40000, 50000000000L); metricValues.put(CURRENT_TIME - 20000, 60000000000L);
metricValues.put(ts - 20000, 60000000000L);
m1.setType(Type.TIME_SERIES); m1.setType(Type.TIME_SERIES);
m1.setValues(metricValues); m1.setValues(metricValues);
metrics.add(m1); metrics.add(m1);
@ -258,7 +258,7 @@ public void testWriteApplicationToHBase() throws Exception {
TimelineMetric aggMetric = new TimelineMetric(); TimelineMetric aggMetric = new TimelineMetric();
aggMetric.setId("MEM_USAGE"); aggMetric.setId("MEM_USAGE");
Map<Long, Number> aggMetricValues = new HashMap<Long, Number>(); Map<Long, Number> aggMetricValues = new HashMap<Long, Number>();
long aggTs = ts; long aggTs = CURRENT_TIME;
aggMetricValues.put(aggTs - 120000, 102400000L); aggMetricValues.put(aggTs - 120000, 102400000L);
aggMetric.setType(Type.SINGLE_VALUE); aggMetric.setType(Type.SINGLE_VALUE);
aggMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM); aggMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
@ -379,7 +379,7 @@ public void testWriteApplicationToHBase() throws Exception {
new TimelineReaderContext(cluster, user, flow, runid, appId, new TimelineReaderContext(cluster, user, flow, runid, appId,
entity.getType(), entity.getId()), entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, new TimelineDataToRetrieve(null, null,
EnumSet.of(TimelineReader.Field.ALL), Integer.MAX_VALUE)); EnumSet.of(TimelineReader.Field.ALL), Integer.MAX_VALUE, null, null));
assertNotNull(e1); assertNotNull(e1);
// verify attributes // verify attributes
@ -422,7 +422,7 @@ public void testWriteApplicationToHBase() throws Exception {
e1 = reader.getEntity(new TimelineReaderContext(cluster, user, flow, e1 = reader.getEntity(new TimelineReaderContext(cluster, user, flow,
runid, appId, entity.getType(), entity.getId()), runid, appId, entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, new TimelineDataToRetrieve(null, null,
EnumSet.of(TimelineReader.Field.ALL), 3)); EnumSet.of(TimelineReader.Field.ALL), 3, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(appId, e1.getId()); assertEquals(appId, e1.getId());
assertEquals(TimelineEntityType.YARN_APPLICATION.toString(), assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
@ -443,7 +443,7 @@ public void testWriteApplicationToHBase() throws Exception {
e1 = reader.getEntity( e1 = reader.getEntity(
new TimelineReaderContext(cluster, user, flow, runid, appId, new TimelineReaderContext(cluster, user, flow, runid, appId,
entity.getType(), entity.getId()), new TimelineDataToRetrieve( entity.getType(), entity.getId()), new TimelineDataToRetrieve(
null, null, EnumSet.of(TimelineReader.Field.ALL), null)); null, null, EnumSet.of(TimelineReader.Field.ALL), null, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(appId, e1.getId()); assertEquals(appId, e1.getId());
assertEquals(TimelineEntityType.YARN_APPLICATION.toString(), assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
@ -464,9 +464,9 @@ public void testWriteApplicationToHBase() throws Exception {
metric.getId().equals("MEM_USAGE")); metric.getId().equals("MEM_USAGE"));
assertEquals(1, metric.getValues().size()); assertEquals(1, metric.getValues().size());
if (metric.getId().equals("MAP_SLOT_MILLIS")) { if (metric.getId().equals("MAP_SLOT_MILLIS")) {
assertTrue(metric.getValues().containsKey(ts - 20000)); assertTrue(metric.getValues().containsKey(CURRENT_TIME - 20000));
assertEquals(metricValues.get(ts - 20000), assertEquals(metricValues.get(CURRENT_TIME - 20000),
metric.getValues().get(ts - 20000)); metric.getValues().get(CURRENT_TIME - 20000));
} }
if (metric.getId().equals("MEM_USAGE")) { if (metric.getId().equals("MEM_USAGE")) {
assertTrue(metric.getValues().containsKey(aggTs - 120000)); assertTrue(metric.getValues().containsKey(aggTs - 120000));
@ -552,11 +552,13 @@ public void testEvents() throws IOException {
TimelineEntity e1 = reader.getEntity( TimelineEntity e1 = reader.getEntity(
new TimelineReaderContext(cluster, user, flow, runid, appName, new TimelineReaderContext(cluster, user, flow, runid, appName,
entity.getType(), entity.getId()), entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
TimelineEntity e2 = reader.getEntity( TimelineEntity e2 = reader.getEntity(
new TimelineReaderContext(cluster, user, null, null, appName, new TimelineReaderContext(cluster, user, null, null, appName,
entity.getType(), entity.getId()), entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertNotNull(e1); assertNotNull(e1);
assertNotNull(e2); assertNotNull(e2);
assertEquals(e1, e2); assertEquals(e1, e2);
@ -650,7 +652,8 @@ public void testReadApps() throws Exception {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1111111111_2222", 1002345678919L, "application_1111111111_2222",
TimelineEntityType.YARN_APPLICATION.toString(), null), TimelineEntityType.YARN_APPLICATION.toString(), null),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertNotNull(entity); assertNotNull(entity);
assertEquals(3, entity.getConfigs().size()); assertEquals(3, entity.getConfigs().size());
assertEquals(1, entity.getIsRelatedToEntities().size()); assertEquals(1, entity.getIsRelatedToEntities().size());
@ -659,7 +662,8 @@ public void testReadApps() throws Exception {
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(), 1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(3, entities.size()); assertEquals(3, entities.size());
int cfgCnt = 0; int cfgCnt = 0;
int metricCnt = 0; int metricCnt = 0;
@ -773,17 +777,17 @@ public void testReadAppsByFields() throws Exception {
1002345678919L, "application_1111111111_2222", 1002345678919L, "application_1111111111_2222",
TimelineEntityType.YARN_APPLICATION.toString(), null), TimelineEntityType.YARN_APPLICATION.toString(), null),
new TimelineDataToRetrieve( new TimelineDataToRetrieve(
null, null, EnumSet.of(Field.INFO, Field.CONFIGS), null)); null, null, EnumSet.of(Field.INFO, Field.CONFIGS), null, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(3, e1.getConfigs().size()); assertEquals(3, e1.getConfigs().size());
assertEquals(0, e1.getIsRelatedToEntities().size()); assertEquals(0, e1.getIsRelatedToEntities().size());
Set<TimelineEntity> es1 = reader.getEntities( Set<TimelineEntity> es1 = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(), 1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), null), new TimelineEntityFilters.Builder().build(),
new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve( new TimelineDataToRetrieve(
null, null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS), null)); null, null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS), null,
null, null));
assertEquals(3, es1.size()); assertEquals(3, es1.size());
int metricsCnt = 0; int metricsCnt = 0;
int isRelatedToCnt = 0; int isRelatedToCnt = 0;
@ -812,7 +816,8 @@ public void testReadAppsIsRelatedTo() throws Exception {
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(), 1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), null),
new TimelineEntityFilters.Builder().isRelatedTo(irt).build(), new TimelineEntityFilters.Builder().isRelatedTo(irt).build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int isRelatedToCnt = 0; int isRelatedToCnt = 0;
for (TimelineEntity timelineEntity : entities) { for (TimelineEntity timelineEntity : entities) {
@ -964,7 +969,8 @@ public void testReadAppsRelatesTo() throws Exception {
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(), 1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), null),
new TimelineEntityFilters.Builder().relatesTo(rt).build(), new TimelineEntityFilters.Builder().relatesTo(rt).build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int relatesToCnt = 0; int relatesToCnt = 0;
for (TimelineEntity timelineEntity : entities) { for (TimelineEntity timelineEntity : entities) {
@ -1202,7 +1208,7 @@ public void testReadAppsConfigFilters() throws Exception {
new TimelineEntityFilters.Builder().configFilters(confFilterList) new TimelineEntityFilters.Builder().configFilters(confFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int cfgCnt = 0; int cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1216,7 +1222,8 @@ public void testReadAppsConfigFilters() throws Exception {
null), null),
new TimelineEntityFilters.Builder().configFilters(confFilterList) new TimelineEntityFilters.Builder().configFilters(confFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
cfgCnt = 0; cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1234,7 +1241,7 @@ public void testReadAppsConfigFilters() throws Exception {
new TimelineEntityFilters.Builder().configFilters(confFilterList1) new TimelineEntityFilters.Builder().configFilters(confFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
cfgCnt = 0; cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1254,7 +1261,7 @@ public void testReadAppsConfigFilters() throws Exception {
new TimelineEntityFilters.Builder().configFilters(confFilterList2) new TimelineEntityFilters.Builder().configFilters(confFilterList2)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList confFilterList3 = new TimelineFilterList( TimelineFilterList confFilterList3 = new TimelineFilterList(
@ -1267,7 +1274,7 @@ public void testReadAppsConfigFilters() throws Exception {
new TimelineEntityFilters.Builder().configFilters(confFilterList3) new TimelineEntityFilters.Builder().configFilters(confFilterList3)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList confFilterList4 = new TimelineFilterList( TimelineFilterList confFilterList4 = new TimelineFilterList(
@ -1280,7 +1287,7 @@ public void testReadAppsConfigFilters() throws Exception {
new TimelineEntityFilters.Builder().configFilters(confFilterList4) new TimelineEntityFilters.Builder().configFilters(confFilterList4)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList confFilterList5 = new TimelineFilterList( TimelineFilterList confFilterList5 = new TimelineFilterList(
@ -1293,7 +1300,7 @@ public void testReadAppsConfigFilters() throws Exception {
new TimelineEntityFilters.Builder().configFilters(confFilterList5) new TimelineEntityFilters.Builder().configFilters(confFilterList5)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(3, entities.size()); assertEquals(3, entities.size());
} }
@ -1309,7 +1316,8 @@ public void testReadAppsEventFilters() throws Exception {
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(), 1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), null),
new TimelineEntityFilters.Builder().eventFilters(ef).build(), new TimelineEntityFilters.Builder().eventFilters(ef).build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
int eventCnt = 0; int eventCnt = 0;
for (TimelineEntity timelineEntity : entities) { for (TimelineEntity timelineEntity : entities) {
@ -1431,7 +1439,7 @@ public void testReadAppsConfigPrefix() throws Exception {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1111111111_2222", 1002345678919L, "application_1111111111_2222",
TimelineEntityType.YARN_APPLICATION.toString(), null), TimelineEntityType.YARN_APPLICATION.toString(), null),
new TimelineDataToRetrieve(list, null, null, null)); new TimelineDataToRetrieve(list, null, null, null, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(1, e1.getConfigs().size()); assertEquals(1, e1.getConfigs().size());
Set<TimelineEntity> es1 = reader.getEntities( Set<TimelineEntity> es1 = reader.getEntities(
@ -1439,7 +1447,7 @@ public void testReadAppsConfigPrefix() throws Exception {
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(), 1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null) , null) ,
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(list, null, null, null)); new TimelineDataToRetrieve(list, null, null, null, null, null));
int cfgCnt = 0; int cfgCnt = 0;
for (TimelineEntity entity : es1) { for (TimelineEntity entity : es1) {
cfgCnt += entity.getConfigs().size(); cfgCnt += entity.getConfigs().size();
@ -1465,7 +1473,7 @@ public void testReadAppsConfigFilterPrefix() throws Exception {
null), null),
new TimelineEntityFilters.Builder().configFilters(confFilterList) new TimelineEntityFilters.Builder().configFilters(confFilterList)
.build(), .build(),
new TimelineDataToRetrieve(list, null, null, null)); new TimelineDataToRetrieve(list, null, null, null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
int cfgCnt = 0; int cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1498,7 +1506,8 @@ public void testReadAppsConfigFilterPrefix() throws Exception {
null), null),
new TimelineEntityFilters.Builder().configFilters(confFilterList1) new TimelineEntityFilters.Builder().configFilters(confFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(confsToRetrieve, null, null, null)); new TimelineDataToRetrieve(confsToRetrieve, null, null, null, null,
null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
cfgCnt = 0; cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1530,7 +1539,7 @@ public void testReadAppsMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList) new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int metricCnt = 0; int metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1544,7 +1553,8 @@ public void testReadAppsMetricFilters() throws Exception {
null), null),
new TimelineEntityFilters.Builder().metricFilters(metricFilterList) new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
metricCnt = 0; metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1564,7 +1574,7 @@ public void testReadAppsMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList1) new TimelineEntityFilters.Builder().metricFilters(metricFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
metricCnt = 0; metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1584,7 +1594,7 @@ public void testReadAppsMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList2) new TimelineEntityFilters.Builder().metricFilters(metricFilterList2)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList metricFilterList3 = new TimelineFilterList( TimelineFilterList metricFilterList3 = new TimelineFilterList(
@ -1597,7 +1607,7 @@ public void testReadAppsMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList3) new TimelineEntityFilters.Builder().metricFilters(metricFilterList3)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList metricFilterList4 = new TimelineFilterList( TimelineFilterList metricFilterList4 = new TimelineFilterList(
@ -1610,7 +1620,7 @@ public void testReadAppsMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList4) new TimelineEntityFilters.Builder().metricFilters(metricFilterList4)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList metricFilterList5 = new TimelineFilterList( TimelineFilterList metricFilterList5 = new TimelineFilterList(
@ -1623,7 +1633,7 @@ public void testReadAppsMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList5) new TimelineEntityFilters.Builder().metricFilters(metricFilterList5)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(3, entities.size()); assertEquals(3, entities.size());
} }
@ -1636,7 +1646,7 @@ public void testReadAppsMetricPrefix() throws Exception {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1111111111_2222", 1002345678919L, "application_1111111111_2222",
TimelineEntityType.YARN_APPLICATION.toString(), null), TimelineEntityType.YARN_APPLICATION.toString(), null),
new TimelineDataToRetrieve(null, list, null, null)); new TimelineDataToRetrieve(null, list, null, null, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(1, e1.getMetrics().size()); assertEquals(1, e1.getMetrics().size());
Set<TimelineEntity> es1 = reader.getEntities( Set<TimelineEntity> es1 = reader.getEntities(
@ -1644,7 +1654,7 @@ public void testReadAppsMetricPrefix() throws Exception {
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(), 1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, list, null, null)); new TimelineDataToRetrieve(null, list, null, null, null, null));
int metricCnt = 0; int metricCnt = 0;
for (TimelineEntity entity : es1) { for (TimelineEntity entity : es1) {
metricCnt += entity.getMetrics().size(); metricCnt += entity.getMetrics().size();
@ -1670,7 +1680,7 @@ public void testReadAppsMetricFilterPrefix() throws Exception {
null), null),
new TimelineEntityFilters.Builder().metricFilters(metricFilterList) new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, list, null, null)); new TimelineDataToRetrieve(null, list, null, null, null, null));
int metricCnt = 0; int metricCnt = 0;
assertEquals(1, entities.size()); assertEquals(1, entities.size());
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1696,7 +1706,8 @@ public void testReadAppsMetricFilterPrefix() throws Exception {
null), null),
new TimelineEntityFilters.Builder().metricFilters(metricFilterList1) new TimelineEntityFilters.Builder().metricFilters(metricFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, metricsToRetrieve, null, null)); new TimelineDataToRetrieve(null, metricsToRetrieve, null, null, null,
null));
metricCnt = 0; metricCnt = 0;
assertEquals(2, entities.size()); assertEquals(2, entities.size());
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1713,8 +1724,8 @@ public void testReadAppsMetricFilterPrefix() throws Exception {
TimelineEntityType.YARN_APPLICATION.toString(), null), TimelineEntityType.YARN_APPLICATION.toString(), null),
new TimelineEntityFilters.Builder().metricFilters(metricFilterList1) new TimelineEntityFilters.Builder().metricFilters(metricFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, new TimelineDataToRetrieve(null, metricsToRetrieve,
metricsToRetrieve, EnumSet.of(Field.METRICS), Integer.MAX_VALUE)); EnumSet.of(Field.METRICS), Integer.MAX_VALUE, null, null));
metricCnt = 0; metricCnt = 0;
int metricValCnt = 0; int metricValCnt = 0;
assertEquals(2, entities.size()); assertEquals(2, entities.size());
@ -1730,6 +1741,86 @@ public void testReadAppsMetricFilterPrefix() throws Exception {
assertEquals(7, metricValCnt); assertEquals(7, metricValCnt);
} }
@Test
public void testReadAppsMetricTimeRange() throws Exception {
Set<TimelineEntity> entities = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
100, null, null));
assertEquals(3, entities.size());
int metricTimeSeriesCnt = 0;
int metricCnt = 0;
for (TimelineEntity entity : entities) {
metricCnt += entity.getMetrics().size();
for (TimelineMetric m : entity.getMetrics()) {
metricTimeSeriesCnt += m.getValues().size();
}
}
assertEquals(3, metricCnt);
assertEquals(13, metricTimeSeriesCnt);
entities = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
100, CURRENT_TIME - 40000, CURRENT_TIME));
assertEquals(3, entities.size());
metricCnt = 0;
metricTimeSeriesCnt = 0;
for (TimelineEntity entity : entities) {
metricCnt += entity.getMetrics().size();
for (TimelineMetric m : entity.getMetrics()) {
for (Long ts : m.getValues().keySet()) {
assertTrue(ts >= CURRENT_TIME - 40000 && ts <= CURRENT_TIME);
}
metricTimeSeriesCnt += m.getValues().size();
}
}
assertEquals(3, metricCnt);
assertEquals(5, metricTimeSeriesCnt);
entities = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null, CURRENT_TIME - 40000, CURRENT_TIME));
assertEquals(3, entities.size());
metricCnt = 0;
metricTimeSeriesCnt = 0;
for (TimelineEntity entity : entities) {
metricCnt += entity.getMetrics().size();
for (TimelineMetric m : entity.getMetrics()) {
for (Long ts : m.getValues().keySet()) {
assertTrue(ts >= CURRENT_TIME - 40000 && ts <= CURRENT_TIME);
}
metricTimeSeriesCnt += m.getValues().size();
}
}
assertEquals(3, metricCnt);
assertEquals(3, metricTimeSeriesCnt);
TimelineEntity entity = reader.getEntity(new TimelineReaderContext(
"cluster1", "user1", "some_flow_name", 1002345678919L,
"application_1111111111_2222",
TimelineEntityType.YARN_APPLICATION.toString(), null),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), 100,
CURRENT_TIME - 40000, CURRENT_TIME));
assertNotNull(entity);
assertEquals(2, entity.getMetrics().size());
metricTimeSeriesCnt = 0;
for (TimelineMetric m : entity.getMetrics()) {
for (Long ts : m.getValues().keySet()) {
assertTrue(ts >= CURRENT_TIME - 40000 && ts <= CURRENT_TIME);
}
metricTimeSeriesCnt += m.getValues().size();
}
assertEquals(3, metricTimeSeriesCnt);
}
@Test @Test
public void testReadAppsInfoFilters() throws Exception { public void testReadAppsInfoFilters() throws Exception {
TimelineFilterList list1 = new TimelineFilterList(); TimelineFilterList list1 = new TimelineFilterList();
@ -1749,7 +1840,8 @@ public void testReadAppsInfoFilters() throws Exception {
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(), 1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList).build(), new TimelineEntityFilters.Builder().infoFilters(infoFilterList).build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int infoCnt = 0; int infoCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1766,7 +1858,8 @@ public void testReadAppsInfoFilters() throws Exception {
null), null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList1) new TimelineEntityFilters.Builder().infoFilters(infoFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
infoCnt = 0; infoCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1785,7 +1878,8 @@ public void testReadAppsInfoFilters() throws Exception {
null), null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList2) new TimelineEntityFilters.Builder().infoFilters(infoFilterList2)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList infoFilterList3 = new TimelineFilterList( TimelineFilterList infoFilterList3 = new TimelineFilterList(
@ -1797,7 +1891,8 @@ public void testReadAppsInfoFilters() throws Exception {
null), null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList3) new TimelineEntityFilters.Builder().infoFilters(infoFilterList3)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList infoFilterList4 = new TimelineFilterList( TimelineFilterList infoFilterList4 = new TimelineFilterList(
@ -1809,7 +1904,8 @@ public void testReadAppsInfoFilters() throws Exception {
null), null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList4) new TimelineEntityFilters.Builder().infoFilters(infoFilterList4)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList infoFilterList5 = new TimelineFilterList( TimelineFilterList infoFilterList5 = new TimelineFilterList(
@ -1821,7 +1917,8 @@ public void testReadAppsInfoFilters() throws Exception {
null), null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList5) new TimelineEntityFilters.Builder().infoFilters(infoFilterList5)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(3, entities.size()); assertEquals(3, entities.size());
} }

View File

@ -93,13 +93,14 @@ public class TestHBaseTimelineStorageEntities {
private static HBaseTestingUtility util; private static HBaseTestingUtility util;
private HBaseTimelineReaderImpl reader; private HBaseTimelineReaderImpl reader;
private static final long CURRENT_TIME = System.currentTimeMillis();
@BeforeClass @BeforeClass
public static void setupBeforeClass() throws Exception { public static void setupBeforeClass() throws Exception {
util = new HBaseTestingUtility(); util = new HBaseTestingUtility();
util.startMiniCluster(); util.startMiniCluster();
DataGeneratorForTest.createSchema(util.getConfiguration()); DataGeneratorForTest.createSchema(util.getConfiguration());
DataGeneratorForTest.loadEntities(util); DataGeneratorForTest.loadEntities(util, CURRENT_TIME);
} }
@Before @Before
@ -294,13 +295,13 @@ public void testWriteEntityToHBase() throws Exception {
new TimelineReaderContext(cluster, user, flow, runid, appName, new TimelineReaderContext(cluster, user, flow, runid, appName,
entity.getType(), entity.getId()), entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL),
Integer.MAX_VALUE)); Integer.MAX_VALUE, null, null));
Set<TimelineEntity> es1 = reader.getEntities( Set<TimelineEntity> es1 = reader.getEntities(
new TimelineReaderContext(cluster, user, flow, runid, appName, new TimelineReaderContext(cluster, user, flow, runid, appName,
entity.getType(), null), entity.getType(), null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL),
Integer.MAX_VALUE)); Integer.MAX_VALUE, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(1, es1.size()); assertEquals(1, es1.size());
@ -331,7 +332,8 @@ public void testWriteEntityToHBase() throws Exception {
e1 = reader.getEntity(new TimelineReaderContext(cluster, user, flow, e1 = reader.getEntity(new TimelineReaderContext(cluster, user, flow,
runid, appName, entity.getType(), entity.getId()), runid, appName, entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(id, e1.getId()); assertEquals(id, e1.getId());
assertEquals(type, e1.getType()); assertEquals(type, e1.getType());
@ -449,12 +451,14 @@ public void testEventsWithEmptyInfo() throws IOException {
TimelineEntity e1 = reader.getEntity( TimelineEntity e1 = reader.getEntity(
new TimelineReaderContext(cluster, user, flow, runid, appName, new TimelineReaderContext(cluster, user, flow, runid, appName,
entity.getType(), entity.getId()), entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
Set<TimelineEntity> es1 = reader.getEntities( Set<TimelineEntity> es1 = reader.getEntities(
new TimelineReaderContext(cluster, user, flow, runid, appName, new TimelineReaderContext(cluster, user, flow, runid, appName,
entity.getType(), null), entity.getType(), null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(1, es1.size()); assertEquals(1, es1.size());
@ -513,7 +517,8 @@ public void testEventsEscapeTs() throws IOException {
TimelineEntity e1 = reader.getEntity( TimelineEntity e1 = reader.getEntity(
new TimelineReaderContext(cluster, user, flow, runid, appName, new TimelineReaderContext(cluster, user, flow, runid, appName,
entity.getType(), entity.getId()), entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertNotNull(e1); assertNotNull(e1);
// check the events // check the events
NavigableSet<TimelineEvent> events = e1.getEvents(); NavigableSet<TimelineEvent> events = e1.getEvents();
@ -542,7 +547,8 @@ public void testReadEntities() throws Exception {
TimelineEntity entity = reader.getEntity( TimelineEntity entity = reader.getEntity(
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", "hello"), 1002345678919L, "application_1231111111_1111", "world", "hello"),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertNotNull(entity); assertNotNull(entity);
assertEquals(3, entity.getConfigs().size()); assertEquals(3, entity.getConfigs().size());
assertEquals(1, entity.getIsRelatedToEntities().size()); assertEquals(1, entity.getIsRelatedToEntities().size());
@ -550,7 +556,8 @@ public void testReadEntities() throws Exception {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", 1002345678919L, "application_1231111111_1111", "world",
null), new TimelineEntityFilters.Builder().build(), null), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(3, entities.size()); assertEquals(3, entities.size());
int cfgCnt = 0; int cfgCnt = 0;
int metricCnt = 0; int metricCnt = 0;
@ -677,7 +684,8 @@ public void testReadEntitiesEventFilters() throws Exception {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().eventFilters(ef).build(), new TimelineEntityFilters.Builder().eventFilters(ef).build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
int eventCnt = 0; int eventCnt = 0;
for (TimelineEntity timelineEntity : entities) { for (TimelineEntity timelineEntity : entities) {
@ -797,7 +805,8 @@ public void testReadEntitiesIsRelatedTo() throws Exception {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().isRelatedTo(irt).build(), new TimelineEntityFilters.Builder().isRelatedTo(irt).build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int isRelatedToCnt = 0; int isRelatedToCnt = 0;
for (TimelineEntity timelineEntity : entities) { for (TimelineEntity timelineEntity : entities) {
@ -939,7 +948,8 @@ public void testReadEntitiesRelatesTo() throws Exception {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().relatesTo(rt).build(), new TimelineEntityFilters.Builder().relatesTo(rt).build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int relatesToCnt = 0; int relatesToCnt = 0;
for (TimelineEntity timelineEntity : entities) { for (TimelineEntity timelineEntity : entities) {
@ -1134,7 +1144,7 @@ public void testReadEntitiesByFields() throws Exception {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", "hello"), 1002345678919L, "application_1231111111_1111", "world", "hello"),
new TimelineDataToRetrieve( new TimelineDataToRetrieve(
null, null, EnumSet.of(Field.INFO, Field.CONFIGS), null)); null, null, EnumSet.of(Field.INFO, Field.CONFIGS), null, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(3, e1.getConfigs().size()); assertEquals(3, e1.getConfigs().size());
assertEquals(0, e1.getIsRelatedToEntities().size()); assertEquals(0, e1.getIsRelatedToEntities().size());
@ -1142,8 +1152,8 @@ public void testReadEntitiesByFields() throws Exception {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve( new TimelineDataToRetrieve(null, null, EnumSet.of(Field.IS_RELATED_TO,
null, null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS), null)); Field.METRICS), null, null, null));
assertEquals(3, es1.size()); assertEquals(3, es1.size());
int metricsCnt = 0; int metricsCnt = 0;
int isRelatedToCnt = 0; int isRelatedToCnt = 0;
@ -1166,14 +1176,14 @@ public void testReadEntitiesConfigPrefix() throws Exception {
TimelineEntity e1 = reader.getEntity( TimelineEntity e1 = reader.getEntity(
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", "hello"), 1002345678919L, "application_1231111111_1111", "world", "hello"),
new TimelineDataToRetrieve(list, null, null, null)); new TimelineDataToRetrieve(list, null, null, null, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(1, e1.getConfigs().size()); assertEquals(1, e1.getConfigs().size());
Set<TimelineEntity> es1 = reader.getEntities( Set<TimelineEntity> es1 = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(list, null, null, null)); new TimelineDataToRetrieve(list, null, null, null, null, null));
int cfgCnt = 0; int cfgCnt = 0;
for (TimelineEntity entity : es1) { for (TimelineEntity entity : es1) {
cfgCnt += entity.getConfigs().size(); cfgCnt += entity.getConfigs().size();
@ -1205,7 +1215,7 @@ public void testReadEntitiesConfigFilters() throws Exception {
new TimelineEntityFilters.Builder().configFilters(confFilterList) new TimelineEntityFilters.Builder().configFilters(confFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int cfgCnt = 0; int cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1218,7 +1228,8 @@ public void testReadEntitiesConfigFilters() throws Exception {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().configFilters(confFilterList) new TimelineEntityFilters.Builder().configFilters(confFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
cfgCnt = 0; cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1235,7 +1246,7 @@ public void testReadEntitiesConfigFilters() throws Exception {
new TimelineEntityFilters.Builder().configFilters(confFilterList1) new TimelineEntityFilters.Builder().configFilters(confFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
cfgCnt = 0; cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1254,7 +1265,7 @@ public void testReadEntitiesConfigFilters() throws Exception {
new TimelineEntityFilters.Builder().configFilters(confFilterList2) new TimelineEntityFilters.Builder().configFilters(confFilterList2)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList confFilterList3 = new TimelineFilterList( TimelineFilterList confFilterList3 = new TimelineFilterList(
@ -1266,7 +1277,7 @@ public void testReadEntitiesConfigFilters() throws Exception {
new TimelineEntityFilters.Builder().configFilters(confFilterList3) new TimelineEntityFilters.Builder().configFilters(confFilterList3)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList confFilterList4 = new TimelineFilterList( TimelineFilterList confFilterList4 = new TimelineFilterList(
@ -1278,7 +1289,7 @@ public void testReadEntitiesConfigFilters() throws Exception {
new TimelineEntityFilters.Builder().configFilters(confFilterList4) new TimelineEntityFilters.Builder().configFilters(confFilterList4)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList confFilterList5 = new TimelineFilterList( TimelineFilterList confFilterList5 = new TimelineFilterList(
@ -1290,7 +1301,7 @@ public void testReadEntitiesConfigFilters() throws Exception {
new TimelineEntityFilters.Builder().configFilters(confFilterList5) new TimelineEntityFilters.Builder().configFilters(confFilterList5)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(3, entities.size()); assertEquals(3, entities.size());
} }
@ -1307,7 +1318,7 @@ public void testReadEntitiesConfigFilterPrefix() throws Exception {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().configFilters(confFilterList) new TimelineEntityFilters.Builder().configFilters(confFilterList)
.build(), .build(),
new TimelineDataToRetrieve(list, null, null, null)); new TimelineDataToRetrieve(list, null, null, null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
int cfgCnt = 0; int cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1338,7 +1349,8 @@ public void testReadEntitiesConfigFilterPrefix() throws Exception {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().configFilters(confFilterList1) new TimelineEntityFilters.Builder().configFilters(confFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(confsToRetrieve, null, null, null)); new TimelineDataToRetrieve(confsToRetrieve, null, null, null, null,
null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
cfgCnt = 0; cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1359,14 +1371,14 @@ public void testReadEntitiesMetricPrefix() throws Exception {
TimelineEntity e1 = reader.getEntity( TimelineEntity e1 = reader.getEntity(
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", "hello"), 1002345678919L, "application_1231111111_1111", "world", "hello"),
new TimelineDataToRetrieve(null, list, null, null)); new TimelineDataToRetrieve(null, list, null, null, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(1, e1.getMetrics().size()); assertEquals(1, e1.getMetrics().size());
Set<TimelineEntity> es1 = reader.getEntities( Set<TimelineEntity> es1 = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, list, null, null)); new TimelineDataToRetrieve(null, list, null, null, null, null));
int metricCnt = 0; int metricCnt = 0;
for (TimelineEntity entity : es1) { for (TimelineEntity entity : es1) {
metricCnt += entity.getMetrics().size(); metricCnt += entity.getMetrics().size();
@ -1378,6 +1390,63 @@ public void testReadEntitiesMetricPrefix() throws Exception {
assertEquals(2, metricCnt); assertEquals(2, metricCnt);
} }
@Test
public void testReadEntitiesMetricTimeRange() throws Exception {
Set<TimelineEntity> entities = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
100, null, null));
assertEquals(3, entities.size());
int metricTimeSeriesCnt = 0;
int metricCnt = 0;
for (TimelineEntity entity : entities) {
metricCnt += entity.getMetrics().size();
for (TimelineMetric m : entity.getMetrics()) {
metricTimeSeriesCnt += m.getValues().size();
}
}
assertEquals(3, metricCnt);
assertEquals(13, metricTimeSeriesCnt);
entities = reader.getEntities(new TimelineReaderContext("cluster1", "user1",
"some_flow_name", 1002345678919L, "application_1231111111_1111",
"world", null), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
100, CURRENT_TIME - 40000, CURRENT_TIME));
assertEquals(3, entities.size());
metricCnt = 0;
metricTimeSeriesCnt = 0;
for (TimelineEntity entity : entities) {
metricCnt += entity.getMetrics().size();
for (TimelineMetric m : entity.getMetrics()) {
for (Long ts : m.getValues().keySet()) {
assertTrue(ts >= CURRENT_TIME - 40000 && ts <= CURRENT_TIME);
}
metricTimeSeriesCnt += m.getValues().size();
}
}
assertEquals(3, metricCnt);
assertEquals(5, metricTimeSeriesCnt);
TimelineEntity entity = reader.getEntity(new TimelineReaderContext(
"cluster1", "user1", "some_flow_name", 1002345678919L,
"application_1231111111_1111", "world", "hello"),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), 100,
CURRENT_TIME - 40000, CURRENT_TIME));
assertNotNull(entity);
assertEquals(2, entity.getMetrics().size());
metricTimeSeriesCnt = 0;
for (TimelineMetric m : entity.getMetrics()) {
for (Long ts : m.getValues().keySet()) {
assertTrue(ts >= CURRENT_TIME - 40000 && ts <= CURRENT_TIME);
}
metricTimeSeriesCnt += m.getValues().size();
}
assertEquals(3, metricTimeSeriesCnt);
}
@Test @Test
public void testReadEntitiesMetricFilters() throws Exception { public void testReadEntitiesMetricFilters() throws Exception {
TimelineFilterList list1 = new TimelineFilterList(); TimelineFilterList list1 = new TimelineFilterList();
@ -1396,7 +1465,7 @@ public void testReadEntitiesMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList) new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int metricCnt = 0; int metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1409,7 +1478,8 @@ public void testReadEntitiesMetricFilters() throws Exception {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().metricFilters(metricFilterList) new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
metricCnt = 0; metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1428,7 +1498,7 @@ public void testReadEntitiesMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList1) new TimelineEntityFilters.Builder().metricFilters(metricFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
metricCnt = 0; metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1447,7 +1517,7 @@ public void testReadEntitiesMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList2) new TimelineEntityFilters.Builder().metricFilters(metricFilterList2)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList metricFilterList3 = new TimelineFilterList( TimelineFilterList metricFilterList3 = new TimelineFilterList(
@ -1459,7 +1529,7 @@ public void testReadEntitiesMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList3) new TimelineEntityFilters.Builder().metricFilters(metricFilterList3)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList metricFilterList4 = new TimelineFilterList( TimelineFilterList metricFilterList4 = new TimelineFilterList(
@ -1471,7 +1541,7 @@ public void testReadEntitiesMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList4) new TimelineEntityFilters.Builder().metricFilters(metricFilterList4)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList metricFilterList5 = new TimelineFilterList( TimelineFilterList metricFilterList5 = new TimelineFilterList(
@ -1483,7 +1553,7 @@ public void testReadEntitiesMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList5) new TimelineEntityFilters.Builder().metricFilters(metricFilterList5)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(3, entities.size()); assertEquals(3, entities.size());
} }
@ -1500,7 +1570,7 @@ public void testReadEntitiesMetricFilterPrefix() throws Exception {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().metricFilters(metricFilterList) new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, list, null, null)); new TimelineDataToRetrieve(null, list, null, null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
int metricCnt = 0; int metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1530,7 +1600,7 @@ public void testReadEntitiesMetricFilterPrefix() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList1) new TimelineEntityFilters.Builder().metricFilters(metricFilterList1)
.build(), .build(),
new TimelineDataToRetrieve( new TimelineDataToRetrieve(
null, metricsToRetrieve, EnumSet.of(Field.METRICS), null)); null, metricsToRetrieve, EnumSet.of(Field.METRICS), null, null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
metricCnt = 0; metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1549,8 +1619,8 @@ public void testReadEntitiesMetricFilterPrefix() throws Exception {
"world", null), "world", null),
new TimelineEntityFilters.Builder().metricFilters(metricFilterList1) new TimelineEntityFilters.Builder().metricFilters(metricFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, new TimelineDataToRetrieve(null, metricsToRetrieve,
metricsToRetrieve, EnumSet.of(Field.METRICS), Integer.MAX_VALUE)); EnumSet.of(Field.METRICS), Integer.MAX_VALUE, null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
metricCnt = 0; metricCnt = 0;
int metricValCnt = 0; int metricValCnt = 0;
@ -1584,7 +1654,8 @@ public void testReadEntitiesInfoFilters() throws Exception {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList).build(), new TimelineEntityFilters.Builder().infoFilters(infoFilterList).build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int infoCnt = 0; int infoCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1600,7 +1671,8 @@ public void testReadEntitiesInfoFilters() throws Exception {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList1) new TimelineEntityFilters.Builder().infoFilters(infoFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
infoCnt = 0; infoCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1618,7 +1690,8 @@ public void testReadEntitiesInfoFilters() throws Exception {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList2) new TimelineEntityFilters.Builder().infoFilters(infoFilterList2)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList infoFilterList3 = new TimelineFilterList( TimelineFilterList infoFilterList3 = new TimelineFilterList(
@ -1629,7 +1702,8 @@ public void testReadEntitiesInfoFilters() throws Exception {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList3) new TimelineEntityFilters.Builder().infoFilters(infoFilterList3)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList infoFilterList4 = new TimelineFilterList( TimelineFilterList infoFilterList4 = new TimelineFilterList(
@ -1640,7 +1714,8 @@ public void testReadEntitiesInfoFilters() throws Exception {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList4) new TimelineEntityFilters.Builder().infoFilters(infoFilterList4)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList infoFilterList5 = new TimelineFilterList( TimelineFilterList infoFilterList5 = new TimelineFilterList(
@ -1651,7 +1726,8 @@ public void testReadEntitiesInfoFilters() throws Exception {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList5) new TimelineEntityFilters.Builder().infoFilters(infoFilterList5)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(3, entities.size()); assertEquals(3, entities.size());
} }

View File

@ -584,7 +584,8 @@ Operator.OR, new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
TimelineEntity entity = hbr.getEntity( TimelineEntity entity = hbr.getEntity(
new TimelineReaderContext(cluster, user, flow, 1002345678919L, null, new TimelineReaderContext(cluster, user, flow, 1002345678919L, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null), TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineDataToRetrieve(null, metricsToRetrieve, null, null)); new TimelineDataToRetrieve(null, metricsToRetrieve, null, null, null,
null));
assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
Set<TimelineMetric> metrics = entity.getMetrics(); Set<TimelineMetric> metrics = entity.getMetrics();
assertEquals(1, metrics.size()); assertEquals(1, metrics.size());
@ -609,7 +610,8 @@ Operator.OR, new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
new TimelineReaderContext(cluster, user, flow, null, null, new TimelineReaderContext(cluster, user, flow, null, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null), TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, metricsToRetrieve, null, null)); new TimelineDataToRetrieve(null, metricsToRetrieve, null, null, null,
null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int metricCnt = 0; int metricCnt = 0;
for (TimelineEntity timelineEntity : entities) { for (TimelineEntity timelineEntity : entities) {
@ -681,7 +683,7 @@ public void testWriteFlowRunsMetricFields() throws Exception {
TimelineEntityType.YARN_FLOW_RUN.toString(), null), TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.METRICS), null)); EnumSet.of(Field.METRICS), null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
for (TimelineEntity timelineEntity : entities) { for (TimelineEntity timelineEntity : entities) {
Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics(); Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics();
@ -948,7 +950,7 @@ public void testMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList) new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.METRICS), null)); EnumSet.of(Field.METRICS), null, null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int metricCnt = 0; int metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -966,7 +968,7 @@ public void testMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList1) new TimelineEntityFilters.Builder().metricFilters(metricFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, null, new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.METRICS), null)); EnumSet.of(Field.METRICS), null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
metricCnt = 0; metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -983,7 +985,7 @@ public void testMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList2) new TimelineEntityFilters.Builder().metricFilters(metricFilterList2)
.build(), .build(),
new TimelineDataToRetrieve(null, null, new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.METRICS), null)); EnumSet.of(Field.METRICS), null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList metricFilterList3 = new TimelineFilterList( TimelineFilterList metricFilterList3 = new TimelineFilterList(
@ -994,7 +996,7 @@ public void testMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList3) new TimelineEntityFilters.Builder().metricFilters(metricFilterList3)
.build(), .build(),
new TimelineDataToRetrieve(null, null, new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.METRICS), null)); EnumSet.of(Field.METRICS), null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList list3 = new TimelineFilterList(); TimelineFilterList list3 = new TimelineFilterList();
@ -1016,7 +1018,7 @@ public void testMetricFilters() throws Exception {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList4) new TimelineEntityFilters.Builder().metricFilters(metricFilterList4)
.build(), .build(),
new TimelineDataToRetrieve(null, metricsToRetrieve, new TimelineDataToRetrieve(null, metricsToRetrieve,
EnumSet.of(Field.ALL), null)); EnumSet.of(Field.ALL), null, null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
metricCnt = 0; metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {

View File

@ -190,7 +190,6 @@ public Object readResult(Result result, byte[] columnQualifierBytes)
NavigableMap<byte[], NavigableMap<Long, byte[]>> columnCellMap = NavigableMap<byte[], NavigableMap<Long, byte[]>> columnCellMap =
resultMap.get(columnFamilyBytes); resultMap.get(columnFamilyBytes);
// could be that there is no such column family. // could be that there is no such column family.
if (columnCellMap != null) { if (columnCellMap != null) {
for (Entry<byte[], NavigableMap<Long, byte[]>> entry : columnCellMap for (Entry<byte[], NavigableMap<Long, byte[]>> entry : columnCellMap

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Query;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
@ -308,4 +309,20 @@ public static boolean isIntegralValue(Object obj) {
return (obj instanceof Short) || (obj instanceof Integer) || return (obj instanceof Short) || (obj instanceof Integer) ||
(obj instanceof Long); (obj instanceof Long);
} }
public static void setMetricsTimeRange(Query query, byte[] metricsCf,
long tsBegin, long tsEnd) {
if (tsBegin != 0 || tsEnd != Long.MAX_VALUE) {
long supplementedTsBegin = tsBegin == 0 ? 0 :
TimestampGenerator.getSupplementedTimestamp(tsBegin, null);
long supplementedTsEnd =
(tsEnd == Long.MAX_VALUE) ? Long.MAX_VALUE :
TimestampGenerator.getSupplementedTimestamp(tsEnd + 1, null);
// Handle overflow by resetting time begin to 0 and time end to
// Long#MAX_VALUE, if required.
query.setColumnFamilyTimeRange(metricsCf,
((supplementedTsBegin < 0) ? 0 : supplementedTsBegin),
((supplementedTsEnd < 0) ? Long.MAX_VALUE : supplementedTsEnd));
}
}
} }

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Query;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
@ -315,6 +316,8 @@ protected Result getResult(Configuration hbaseConf, Connection conn,
context.getFlowName(), context.getFlowRunId(), context.getAppId()); context.getFlowName(), context.getFlowRunId(), context.getAppId());
byte[] rowKey = applicationRowKey.getRowKey(); byte[] rowKey = applicationRowKey.getRowKey();
Get get = new Get(rowKey); Get get = new Get(rowKey);
// Set time range for metric values.
setMetricsTimeRange(get);
get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
if (filterList != null && !filterList.getFilters().isEmpty()) { if (filterList != null && !filterList.getFilters().isEmpty()) {
get.setFilter(filterList); get.setFilter(filterList);
@ -357,6 +360,14 @@ protected void augmentParams(Configuration hbaseConf, Connection conn)
} }
} }
private void setMetricsTimeRange(Query query) {
// Set time range for metric values.
HBaseTimelineStorageUtils.
setMetricsTimeRange(query, ApplicationColumnFamily.METRICS.getBytes(),
getDataToRetrieve().getMetricsTimeBegin(),
getDataToRetrieve().getMetricsTimeEnd());
}
@Override @Override
protected ResultScanner getResults(Configuration hbaseConf, protected ResultScanner getResults(Configuration hbaseConf,
Connection conn, FilterList filterList) throws IOException { Connection conn, FilterList filterList) throws IOException {
@ -405,6 +416,9 @@ protected ResultScanner getResults(Configuration hbaseConf,
newList.addFilter(filterList); newList.addFilter(filterList);
} }
scan.setFilter(newList); scan.setFilter(newList);
// Set time range for metric values.
setMetricsTimeRange(scan);
scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
return getTable().getResultScanner(hbaseConf, conn, scan); return getTable().getResultScanner(hbaseConf, conn, scan);
} }

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Query;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
@ -434,8 +435,8 @@ protected Result getResult(Configuration hbaseConf, Connection conn,
context.getUserId(), context.getFlowName(), context.getFlowRunId(), context.getUserId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId(), context.getEntityType(), context.getAppId(), context.getEntityType(),
context.getEntityIdPrefix(), context.getEntityId()).getRowKey(); context.getEntityIdPrefix(), context.getEntityId()).getRowKey();
Get get = new Get(rowKey); Get get = new Get(rowKey);
setMetricsTimeRange(get);
get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
if (filterList != null && !filterList.getFilters().isEmpty()) { if (filterList != null && !filterList.getFilters().isEmpty()) {
get.setFilter(filterList); get.setFilter(filterList);
@ -468,6 +469,14 @@ protected Result getResult(Configuration hbaseConf, Connection conn,
return result; return result;
} }
private void setMetricsTimeRange(Query query) {
// Set time range for metric values.
HBaseTimelineStorageUtils.
setMetricsTimeRange(query, EntityColumnFamily.METRICS.getBytes(),
getDataToRetrieve().getMetricsTimeBegin(),
getDataToRetrieve().getMetricsTimeEnd());
}
@Override @Override
protected ResultScanner getResults(Configuration hbaseConf, Connection conn, protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
FilterList filterList) throws IOException { FilterList filterList) throws IOException {
@ -513,6 +522,7 @@ protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
// mode. // mode.
filterList.addFilter(new PageFilter(getFilters().getLimit())); filterList.addFilter(new PageFilter(getFilters().getLimit()));
} }
setMetricsTimeRange(scan);
scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
if (filterList != null && !filterList.getFilters().isEmpty()) { if (filterList != null && !filterList.getFilters().isEmpty()) {
scan.setFilter(filterList); scan.setFilter(filterList);

View File

@ -57,6 +57,11 @@
* metricsToRetrieve is specified, this limit defines an upper limit to the * metricsToRetrieve is specified, this limit defines an upper limit to the
* number of metrics to return. This parameter is ignored if METRICS are not to * number of metrics to return. This parameter is ignored if METRICS are not to
* be fetched.</li> * be fetched.</li>
* <li><b>metricsTimeStart</b> - Metric values before this timestamp would not
* be retrieved. If null or {@literal <0}, defaults to 0.</li>
* <li><b>metricsTimeEnd</b> - Metric values after this timestamp would not
* be retrieved. If null or {@literal <0}, defaults to {@link Long#MAX_VALUE}.
* </li>
* </ul> * </ul>
*/ */
@Private @Private
@ -66,6 +71,10 @@ public class TimelineDataToRetrieve {
private TimelineFilterList metricsToRetrieve; private TimelineFilterList metricsToRetrieve;
private EnumSet<Field> fieldsToRetrieve; private EnumSet<Field> fieldsToRetrieve;
private Integer metricsLimit; private Integer metricsLimit;
private Long metricsTimeBegin;
private Long metricsTimeEnd;
private static final long DEFAULT_METRICS_BEGIN_TIME = 0L;
private static final long DEFAULT_METRICS_END_TIME = Long.MAX_VALUE;
/** /**
* Default limit of number of metrics to return. * Default limit of number of metrics to return.
@ -73,12 +82,12 @@ public class TimelineDataToRetrieve {
public static final Integer DEFAULT_METRICS_LIMIT = 1; public static final Integer DEFAULT_METRICS_LIMIT = 1;
public TimelineDataToRetrieve() { public TimelineDataToRetrieve() {
this(null, null, null, null); this(null, null, null, null, null, null);
} }
public TimelineDataToRetrieve(TimelineFilterList confs, public TimelineDataToRetrieve(TimelineFilterList confs,
TimelineFilterList metrics, EnumSet<Field> fields, TimelineFilterList metrics, EnumSet<Field> fields,
Integer limitForMetrics) { Integer limitForMetrics, Long metricTimeBegin, Long metricTimeEnd) {
this.confsToRetrieve = confs; this.confsToRetrieve = confs;
this.metricsToRetrieve = metrics; this.metricsToRetrieve = metrics;
this.fieldsToRetrieve = fields; this.fieldsToRetrieve = fields;
@ -91,6 +100,20 @@ public TimelineDataToRetrieve(TimelineFilterList confs,
if (this.fieldsToRetrieve == null) { if (this.fieldsToRetrieve == null) {
this.fieldsToRetrieve = EnumSet.noneOf(Field.class); this.fieldsToRetrieve = EnumSet.noneOf(Field.class);
} }
if (metricTimeBegin == null || metricTimeBegin < 0) {
this.metricsTimeBegin = DEFAULT_METRICS_BEGIN_TIME;
} else {
this.metricsTimeBegin = metricTimeBegin;
}
if (metricTimeEnd == null || metricTimeEnd < 0) {
this.metricsTimeEnd = DEFAULT_METRICS_END_TIME;
} else {
this.metricsTimeEnd = metricTimeEnd;
}
if (this.metricsTimeBegin > this.metricsTimeEnd) {
throw new IllegalArgumentException("metricstimebegin should not be " +
"greater than metricstimeend");
}
} }
public TimelineFilterList getConfsToRetrieve() { public TimelineFilterList getConfsToRetrieve() {
@ -137,6 +160,14 @@ public Integer getMetricsLimit() {
return metricsLimit; return metricsLimit;
} }
public Long getMetricsTimeBegin() {
return this.metricsTimeBegin;
}
public Long getMetricsTimeEnd() {
return metricsTimeEnd;
}
public void setMetricsLimit(Integer limit) { public void setMetricsLimit(Integer limit) {
if (limit == null || limit < 1) { if (limit == null || limit < 1) {
this.metricsLimit = DEFAULT_METRICS_LIMIT; this.metricsLimit = DEFAULT_METRICS_LIMIT;

View File

@ -264,6 +264,11 @@ public TimelineAbout about(
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the entities
* would not contain metric values before this timestamp(Optional query
* param).
* @param metricsTimeEnd If specified, returned metrics for the entities would
* not contain metric values after this timestamp(Optional query param).
* @param fromId If specified, retrieve the next set of entities from the * @param fromId If specified, retrieve the next set of entities from the
* given fromId. The set of entities retrieved is inclusive of specified * given fromId. The set of entities retrieved is inclusive of specified
* fromId. fromId should be taken from the value associated with FROM_ID * fromId. fromId should be taken from the value associated with FROM_ID
@ -299,6 +304,8 @@ public Set<TimelineEntity> getEntities(
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
String url = req.getRequestURI() + String url = req.getRequestURI() +
(req.getQueryString() == null ? "" : (req.getQueryString() == null ? "" :
@ -325,7 +332,8 @@ public Set<TimelineEntity> getEntities(
infofilters, conffilters, metricfilters, eventfilters, infofilters, conffilters, metricfilters, eventfilters,
fromId), fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, handleException(e, url, startTime,
"createdTime start/end or limit or flowrunid"); "createdTime start/end or limit or flowrunid");
@ -406,6 +414,11 @@ public Set<TimelineEntity> getEntities(
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the entities
* would not contain metric values before this timestamp(Optional query
* param).
* @param metricsTimeEnd If specified, returned metrics for the entities would
* not contain metric values after this timestamp(Optional query param).
* @param fromId If specified, retrieve the next set of entities from the * @param fromId If specified, retrieve the next set of entities from the
* given fromId. The set of entities retrieved is inclusive of specified * given fromId. The set of entities retrieved is inclusive of specified
* fromId. fromId should be taken from the value associated with FROM_ID * fromId. fromId should be taken from the value associated with FROM_ID
@ -446,12 +459,14 @@ public Set<TimelineEntity> getEntities(
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
return getEntities(req, res, null, appId, entityType, userId, flowName, return getEntities(req, res, null, appId, entityType, userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromId); metricsTimeStart, metricsTimeEnd, fromId);
} }
/** /**
@ -522,6 +537,11 @@ public Set<TimelineEntity> getEntities(
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the entities
* would not contain metric values before this timestamp(Optional query
* param).
* @param metricsTimeEnd If specified, returned metrics for the entities would
* not contain metric values after this timestamp(Optional query param).
* @param fromId If specified, retrieve the next set of entities from the * @param fromId If specified, retrieve the next set of entities from the
* given fromId. The set of entities retrieved is inclusive of specified * given fromId. The set of entities retrieved is inclusive of specified
* fromId. fromId should be taken from the value associated with FROM_ID * fromId. fromId should be taken from the value associated with FROM_ID
@ -563,6 +583,8 @@ public Set<TimelineEntity> getEntities(
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
String url = req.getRequestURI() + String url = req.getRequestURI() +
(req.getQueryString() == null ? "" : (req.getQueryString() == null ? "" :
@ -585,7 +607,8 @@ public Set<TimelineEntity> getEntities(
infofilters, conffilters, metricfilters, eventfilters, infofilters, conffilters, metricfilters, eventfilters,
fromId), fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, handleException(e, url, startTime,
"createdTime start/end or limit or flowrunid"); "createdTime start/end or limit or flowrunid");
@ -627,6 +650,10 @@ public Set<TimelineEntity> getEntities(
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the entity would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the entity would
* not contain metric values after this timestamp(Optional query param).
* *
* @return If successful, a HTTP 200(OK) response having a JSON representing a * @return If successful, a HTTP 200(OK) response having a JSON representing a
* <cite>TimelineEntity</cite> instance is returned.<br> * <cite>TimelineEntity</cite> instance is returned.<br>
@ -648,7 +675,9 @@ public TimelineEntity getEntity(
@QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) { @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd) {
String url = req.getRequestURI() + String url = req.getRequestURI() +
(req.getQueryString() == null ? "" : (req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString()); QUERY_STRING_SEP + req.getQueryString());
@ -668,7 +697,8 @@ public TimelineEntity getEntity(
} }
entity = timelineReaderManager.getEntity(context, entity = timelineReaderManager.getEntity(context,
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, "flowrunid"); handleException(e, url, startTime, "flowrunid");
} }
@ -722,6 +752,10 @@ public TimelineEntity getEntity(
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the entity would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the entity would
* not contain metric values after this timestamp(Optional query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched. * @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster. * If specified, then entity retrieval will be faster.
* *
@ -751,10 +785,12 @@ public TimelineEntity getEntity(
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("entityidprefix") String entityIdPrefix) { @QueryParam("entityidprefix") String entityIdPrefix) {
return getEntity(req, res, null, appId, entityType, entityId, userId, return getEntity(req, res, null, appId, entityType, entityId, userId,
flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields, flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields,
metricsLimit, entityIdPrefix); metricsLimit, metricsTimeStart, metricsTimeEnd, entityIdPrefix);
} }
/** /**
@ -796,6 +832,10 @@ public TimelineEntity getEntity(
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the entity would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the entity would
* not contain metric values after this timestamp(Optional query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched. * @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster. * If specified, then entity retrieval will be faster.
* *
@ -826,6 +866,8 @@ public TimelineEntity getEntity(
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("entityidprefix") String entityIdPrefix) { @QueryParam("entityidprefix") String entityIdPrefix) {
String url = req.getRequestURI() + String url = req.getRequestURI() +
(req.getQueryString() == null ? "" : (req.getQueryString() == null ? "" :
@ -844,7 +886,8 @@ public TimelineEntity getEntity(
clusterId, userId, flowName, flowRunId, appId, entityType, clusterId, userId, flowName, flowRunId, appId, entityType,
entityIdPrefix, entityId), entityIdPrefix, entityId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, "flowrunid"); handleException(e, url, startTime, "flowrunid");
} }
@ -911,7 +954,7 @@ public TimelineEntity getFlowRun(
context.setEntityType(TimelineEntityType.YARN_FLOW_RUN.toString()); context.setEntityType(TimelineEntityType.YARN_FLOW_RUN.toString());
entity = timelineReaderManager.getEntity(context, entity = timelineReaderManager.getEntity(context,
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, null, null)); null, metricsToRetrieve, null, null, null, null));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, "flowrunid"); handleException(e, url, startTime, "flowrunid");
} }
@ -1020,7 +1063,7 @@ public TimelineEntity getFlowRun(
clusterId, userId, flowName, flowRunId, null, clusterId, userId, flowName, flowRunId, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null, null), TimelineEntityType.YARN_FLOW_RUN.toString(), null, null),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, null, null)); null, metricsToRetrieve, null, null, null, null));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, "flowrunid"); handleException(e, url, startTime, "flowrunid");
} }
@ -1114,7 +1157,7 @@ public Set<TimelineEntity> getFlowRuns(
limit, createdTimeStart, createdTimeEnd, null, null, null, limit, createdTimeStart, createdTimeEnd, null, null, null,
null, null, null, fromId), null, null, null, fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, fields, null)); null, metricsToRetrieve, fields, null, null, null));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, handleException(e, url, startTime,
"createdTime start/end or limit or fromId"); "createdTime start/end or limit or fromId");
@ -1264,7 +1307,7 @@ public Set<TimelineEntity> getFlowRuns(
limit, createdTimeStart, createdTimeEnd, null, null, null, limit, createdTimeStart, createdTimeEnd, null, null, null,
null, null, null, fromId), null, null, null, fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, fields, null)); null, metricsToRetrieve, fields, null, null, null));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, handleException(e, url, startTime,
"createdTime start/end or limit or fromId"); "createdTime start/end or limit or fromId");
@ -1399,7 +1442,7 @@ public Set<TimelineEntity> getFlows(
clusterId, null, null, null, null, clusterId, null, null, null, null,
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null), TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null),
entityFilters, TimelineReaderWebServicesUtils. entityFilters, TimelineReaderWebServicesUtils.
createTimelineDataToRetrieve(null, null, null, null)); createTimelineDataToRetrieve(null, null, null, null, null, null));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, "limit"); handleException(e, url, startTime, "limit");
} }
@ -1440,6 +1483,10 @@ public Set<TimelineEntity> getFlows(
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the apps would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the apps would
* not contain metric values after this timestamp(Optional query param).
* *
* @return If successful, a HTTP 200(OK) response having a JSON representing a * @return If successful, a HTTP 200(OK) response having a JSON representing a
* <cite>TimelineEntity</cite> instance is returned.<br> * <cite>TimelineEntity</cite> instance is returned.<br>
@ -1461,7 +1508,9 @@ public TimelineEntity getApp(
@QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) { @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd) {
String url = req.getRequestURI() + String url = req.getRequestURI() +
(req.getQueryString() == null ? "" : (req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString()); QUERY_STRING_SEP + req.getQueryString());
@ -1482,7 +1531,8 @@ public TimelineEntity getApp(
context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString()); context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
entity = timelineReaderManager.getEntity(context, entity = timelineReaderManager.getEntity(context,
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, "flowrunid"); handleException(e, url, startTime, "flowrunid");
} }
@ -1531,6 +1581,10 @@ public TimelineEntity getApp(
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the app would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the app would
* not contain metric values after this timestamp(Optional query param).
* *
* @return If successful, a HTTP 200(OK) response having a JSON representing a * @return If successful, a HTTP 200(OK) response having a JSON representing a
* <cite>TimelineEntity</cite> instance is returned.<br> * <cite>TimelineEntity</cite> instance is returned.<br>
@ -1555,9 +1609,12 @@ public TimelineEntity getApp(
@QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) { @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd) {
return getApp(req, res, null, appId, flowName, flowRunId, userId, return getApp(req, res, null, appId, flowName, flowRunId, userId,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd);
} }
/** /**
@ -1595,6 +1652,10 @@ public TimelineEntity getApp(
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the app would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the app would
* not contain metric values after this timestamp(Optional query param).
* *
* @return If successful, a HTTP 200(OK) response having a JSON representing a * @return If successful, a HTTP 200(OK) response having a JSON representing a
* <cite>TimelineEntity</cite> instance is returned.<br> * <cite>TimelineEntity</cite> instance is returned.<br>
@ -1620,7 +1681,9 @@ public TimelineEntity getApp(
@QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) { @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd) {
String url = req.getRequestURI() + String url = req.getRequestURI() +
(req.getQueryString() == null ? "" : (req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString()); QUERY_STRING_SEP + req.getQueryString());
@ -1638,7 +1701,8 @@ public TimelineEntity getApp(
clusterId, userId, flowName, flowRunId, appId, clusterId, userId, flowName, flowRunId, appId,
TimelineEntityType.YARN_APPLICATION.toString(), null, null), TimelineEntityType.YARN_APPLICATION.toString(), null, null),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, "flowrunid"); handleException(e, url, startTime, "flowrunid");
} }
@ -1711,6 +1775,10 @@ public TimelineEntity getApp(
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the apps would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the apps would
* not contain metric values after this timestamp(Optional query param).
* @param fromId If specified, retrieve the next set of applications * @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive * from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated * of specified fromId. fromId should be taken from the value associated
@ -1745,6 +1813,8 @@ public Set<TimelineEntity> getFlowRunApps(
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
String url = req.getRequestURI() + String url = req.getRequestURI() +
(req.getQueryString() == null ? "" : (req.getQueryString() == null ? "" :
@ -1770,7 +1840,8 @@ public Set<TimelineEntity> getFlowRunApps(
infofilters, conffilters, metricfilters, eventfilters, infofilters, conffilters, metricfilters, eventfilters,
fromId), fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, handleException(e, url, startTime,
"createdTime start/end or limit or flowrunid"); "createdTime start/end or limit or flowrunid");
@ -1844,6 +1915,10 @@ public Set<TimelineEntity> getFlowRunApps(
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the apps would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the apps would
* not contain metric values after this timestamp(Optional query param).
* @param fromId If specified, retrieve the next set of applications * @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive * from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated * of specified fromId. fromId should be taken from the value associated
@ -1880,12 +1955,15 @@ public Set<TimelineEntity> getFlowRunApps(
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
return getEntities(req, res, null, null, return getEntities(req, res, null, null,
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromId); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd, fromId);
} }
/** /**
@ -1949,6 +2027,10 @@ public Set<TimelineEntity> getFlowRunApps(
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the apps would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the apps would
* not contain metric values after this timestamp(Optional query param).
* @param fromId If specified, retrieve the next set of applications * @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive * from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated * of specified fromId. fromId should be taken from the value associated
@ -1987,12 +2069,15 @@ public Set<TimelineEntity> getFlowRunApps(
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
return getEntities(req, res, clusterId, null, return getEntities(req, res, clusterId, null,
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromId); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd, fromId);
} }
/** /**
@ -2053,6 +2138,10 @@ public Set<TimelineEntity> getFlowRunApps(
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the apps would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the apps would
* not contain metric values after this timestamp(Optional query param).
* @param fromId If specified, retrieve the next set of applications * @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive * from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated * of specified fromId. fromId should be taken from the value associated
@ -2088,12 +2177,15 @@ public Set<TimelineEntity> getFlowApps(
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
return getEntities(req, res, null, null, return getEntities(req, res, null, null,
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromId); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd, fromId);
} }
/** /**
@ -2155,6 +2247,10 @@ public Set<TimelineEntity> getFlowApps(
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the apps would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the apps would
* not contain metric values after this timestamp(Optional query param).
* @param fromId If specified, retrieve the next set of applications * @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive * from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated * of specified fromId. fromId should be taken from the value associated
@ -2191,12 +2287,15 @@ public Set<TimelineEntity> getFlowApps(
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
return getEntities(req, res, clusterId, null, return getEntities(req, res, clusterId, null,
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromId); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd, fromId);
} }
/** /**
@ -2267,6 +2366,12 @@ public Set<TimelineEntity> getFlowApps(
* have to be retrieved, then metricsLimit will be considered as 1 * have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional * i.e. latest single value of metric(s) will be returned. (Optional
* query param). * query param).
* @param metricsTimeStart If specified, returned metrics for the app attempts
* would not contain metric values before this timestamp(Optional
* query param).
* @param metricsTimeEnd If specified, returned metrics for the app attempts
* would not contain metric values after this timestamp(Optional
* query param).
* @param fromId If specified, retrieve the next set of application-attempt * @param fromId If specified, retrieve the next set of application-attempt
* entities from the given fromId. The set of application-attempt * entities from the given fromId. The set of application-attempt
* entities retrieved is inclusive of specified fromId. fromId should * entities retrieved is inclusive of specified fromId. fromId should
@ -2305,12 +2410,15 @@ public Set<TimelineEntity> getAppAttempts(@Context HttpServletRequest req,
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
return getAppAttempts(req, res, null, appId, userId, flowName, flowRunId, return getAppAttempts(req, res, null, appId, userId, flowName, flowRunId,
limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters, confsToRetrieve, infofilters, conffilters, metricfilters, eventfilters, confsToRetrieve,
metricsToRetrieve, fields, metricsLimit, fromId); metricsToRetrieve, fields, metricsLimit, metricsTimeStart,
metricsTimeEnd, fromId);
} }
/** /**
@ -2382,6 +2490,12 @@ public Set<TimelineEntity> getAppAttempts(@Context HttpServletRequest req,
* have to be retrieved, then metricsLimit will be considered as 1 * have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional * i.e. latest single value of metric(s) will be returned. (Optional
* query param). * query param).
* @param metricsTimeStart If specified, returned metrics for the app attempts
* would not contain metric values before this timestamp(Optional
* query param).
* @param metricsTimeEnd If specified, returned metrics for the app attempts
* would not contain metric values after this timestamp(Optional
* query param).
* @param fromId If specified, retrieve the next set of application-attempt * @param fromId If specified, retrieve the next set of application-attempt
* entities from the given fromId. The set of application-attempt * entities from the given fromId. The set of application-attempt
* entities retrieved is inclusive of specified fromId. fromId should * entities retrieved is inclusive of specified fromId. fromId should
@ -2421,6 +2535,8 @@ public Set<TimelineEntity> getAppAttempts(@Context HttpServletRequest req,
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
return getEntities(req, res, clusterId, appId, return getEntities(req, res, clusterId, appId,
@ -2428,7 +2544,7 @@ public Set<TimelineEntity> getAppAttempts(@Context HttpServletRequest req,
flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromId); metricsTimeStart, metricsTimeEnd, fromId);
} }
/** /**
@ -2471,6 +2587,12 @@ public Set<TimelineEntity> getAppAttempts(@Context HttpServletRequest req,
* have to be retrieved, then metricsLimit will be considered as 1 * have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional * i.e. latest single value of metric(s) will be returned. (Optional
* query param). * query param).
* @param metricsTimeStart If specified, returned metrics for the app attempt
* would not contain metric values before this timestamp(Optional
* query param).
* @param metricsTimeEnd If specified, returned metrics for the app attempt
* would not contain metric values after this timestamp(Optional
* query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched. * @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster. * If specified, then entity retrieval will be faster.
* *
@ -2498,10 +2620,12 @@ public TimelineEntity getAppAttempt(@Context HttpServletRequest req,
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("entityidprefix") String entityIdPrefix) { @QueryParam("entityidprefix") String entityIdPrefix) {
return getAppAttempt(req, res, null, appId, appAttemptId, userId, flowName, return getAppAttempt(req, res, null, appId, appAttemptId, userId, flowName,
flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit, flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
entityIdPrefix); metricsTimeStart, metricsTimeEnd, entityIdPrefix);
} }
/** /**
@ -2544,6 +2668,12 @@ public TimelineEntity getAppAttempt(@Context HttpServletRequest req,
* have to be retrieved, then metricsLimit will be considered as 1 * have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional * i.e. latest single value of metric(s) will be returned. (Optional
* query param). * query param).
* @param metricsTimeStart If specified, returned metrics for the app attempt
* would not contain metric values before this timestamp(Optional
* query param).
* @param metricsTimeEnd If specified, returned metrics for the app attempt
* would not contain metric values after this timestamp(Optional
* query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched. * @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster. * If specified, then entity retrieval will be faster.
* *
@ -2573,11 +2703,13 @@ public TimelineEntity getAppAttempt(@Context HttpServletRequest req,
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("entityidprefix") String entityIdPrefix) { @QueryParam("entityidprefix") String entityIdPrefix) {
return getEntity(req, res, clusterId, appId, return getEntity(req, res, clusterId, appId,
TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), appAttemptId, TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), appAttemptId,
userId, flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields, userId, flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields,
metricsLimit, entityIdPrefix); metricsLimit, metricsTimeStart, metricsTimeEnd, entityIdPrefix);
} }
/** /**
@ -2650,6 +2782,12 @@ public TimelineEntity getAppAttempt(@Context HttpServletRequest req,
* have to be retrieved, then metricsLimit will be considered as 1 * have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional * i.e. latest single value of metric(s) will be returned. (Optional
* query param). * query param).
* @param metricsTimeStart If specified, returned metrics for the containers
* would not contain metric values before this timestamp(Optional
* query param).
* @param metricsTimeEnd If specified, returned metrics for the containers
* would not contain metric values after this timestamp(Optional
* query param).
* @param fromId If specified, retrieve the next set of container * @param fromId If specified, retrieve the next set of container
* entities from the given fromId. The set of container * entities from the given fromId. The set of container
* entities retrieved is inclusive of specified fromId. fromId should * entities retrieved is inclusive of specified fromId. fromId should
@ -2689,12 +2827,14 @@ public Set<TimelineEntity> getContainers(@Context HttpServletRequest req,
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
return getContainers(req, res, null, appId, appattemptId, userId, flowName, return getContainers(req, res, null, appId, appattemptId, userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromId); metricsTimeStart, metricsTimeEnd, fromId);
} }
/** /**
@ -2768,6 +2908,12 @@ public Set<TimelineEntity> getContainers(@Context HttpServletRequest req,
* have to be retrieved, then metricsLimit will be considered as 1 * have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional * i.e. latest single value of metric(s) will be returned. (Optional
* query param). * query param).
* @param metricsTimeStart If specified, returned metrics for the containers
* would not contain metric values before this timestamp(Optional
* query param).
* @param metricsTimeEnd If specified, returned metrics for the containers
* would not contain metric values after this timestamp(Optional
* query param).
* @param fromId If specified, retrieve the next set of container * @param fromId If specified, retrieve the next set of container
* entities from the given fromId. The set of container * entities from the given fromId. The set of container
* entities retrieved is inclusive of specified fromId. fromId should * entities retrieved is inclusive of specified fromId. fromId should
@ -2809,6 +2955,8 @@ public Set<TimelineEntity> getContainers(@Context HttpServletRequest req,
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
String entityType = TimelineEntityType.YARN_CONTAINER.toString(); String entityType = TimelineEntityType.YARN_CONTAINER.toString();
@ -2828,7 +2976,7 @@ public Set<TimelineEntity> getContainers(@Context HttpServletRequest req,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilter, conffilters, metricfilters, eventfilters, isRelatedTo, infofilter, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromId); metricsTimeStart, metricsTimeEnd, fromId);
} }
/** /**
@ -2870,6 +3018,12 @@ public Set<TimelineEntity> getContainers(@Context HttpServletRequest req,
* have to be retrieved, then metricsLimit will be considered as 1 * have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional * i.e. latest single value of metric(s) will be returned. (Optional
* query param). * query param).
* @param metricsTimeStart If specified, returned metrics for the container
* would not contain metric values before this timestamp(Optional
* query param).
* @param metricsTimeEnd If specified, returned metrics for the container
* would not contain metric values after this timestamp(Optional
* query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched. * @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster. * If specified, then entity retrieval will be faster.
* *
@ -2897,10 +3051,12 @@ public TimelineEntity getContainer(@Context HttpServletRequest req,
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("entityidprefix") String entityIdPrefix) { @QueryParam("entityidprefix") String entityIdPrefix) {
return getContainer(req, res, null, appId, containerId, userId, flowName, return getContainer(req, res, null, appId, containerId, userId, flowName,
flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit, flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
entityIdPrefix); entityIdPrefix, metricsTimeStart, metricsTimeEnd);
} }
/** /**
@ -2943,6 +3099,12 @@ public TimelineEntity getContainer(@Context HttpServletRequest req,
* have to be retrieved, then metricsLimit will be considered as 1 * have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional * i.e. latest single value of metric(s) will be returned. (Optional
* query param). * query param).
* @param metricsTimeStart If specified, returned metrics for the container
* would not contain metric values before this timestamp(Optional
* query param).
* @param metricsTimeEnd If specified, returned metrics for the container
* would not contain metric values after this timestamp(Optional
* query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched. * @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster. * If specified, then entity retrieval will be faster.
* *
@ -2972,11 +3134,13 @@ public TimelineEntity getContainer(@Context HttpServletRequest req,
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("entityidprefix") String entityIdPrefix) { @QueryParam("entityidprefix") String entityIdPrefix) {
return getEntity(req, res, clusterId, appId, return getEntity(req, res, clusterId, appId,
TimelineEntityType.YARN_CONTAINER.toString(), containerId, userId, TimelineEntityType.YARN_CONTAINER.toString(), containerId, userId,
flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields, flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields,
metricsLimit, entityIdPrefix); metricsLimit, metricsTimeStart, metricsTimeEnd, entityIdPrefix);
} }
/** /**

View File

@ -127,11 +127,13 @@ static TimelineEntityFilters createTimelineEntityFilters(String limit,
* @throws TimelineParseException if any problem occurs during parsing. * @throws TimelineParseException if any problem occurs during parsing.
*/ */
static TimelineDataToRetrieve createTimelineDataToRetrieve(String confs, static TimelineDataToRetrieve createTimelineDataToRetrieve(String confs,
String metrics, String fields, String metricsLimit) String metrics, String fields, String metricsLimit,
String metricsTimeBegin, String metricsTimeEnd)
throws TimelineParseException { throws TimelineParseException {
return new TimelineDataToRetrieve(parseDataToRetrieve(confs), return new TimelineDataToRetrieve(parseDataToRetrieve(confs),
parseDataToRetrieve(metrics), parseFieldsStr(fields, parseDataToRetrieve(metrics), parseFieldsStr(fields,
TimelineParseConstants.COMMA_DELIMITER), parseIntStr(metricsLimit)); TimelineParseConstants.COMMA_DELIMITER), parseIntStr(metricsLimit),
parseLongStr(metricsTimeBegin), parseLongStr(metricsTimeEnd));
} }
/** /**

View File

@ -319,7 +319,7 @@ public void testGetEntityDefaultView() throws Exception {
TimelineEntity result = reader.getEntity( TimelineEntity result = reader.getEntity(
new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
"app", "id_1"), "app", "id_1"),
new TimelineDataToRetrieve(null, null, null, null)); new TimelineDataToRetrieve(null, null, null, null, null, null));
Assert.assertEquals( Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_1")).toString(), (new TimelineEntity.Identifier("app", "id_1")).toString(),
result.getIdentifier().toString()); result.getIdentifier().toString());
@ -334,7 +334,7 @@ public void testGetEntityByClusterAndApp() throws Exception {
TimelineEntity result = reader.getEntity( TimelineEntity result = reader.getEntity(
new TimelineReaderContext("cluster1", null, null, null, "app1", "app", new TimelineReaderContext("cluster1", null, null, null, "app1", "app",
"id_1"), "id_1"),
new TimelineDataToRetrieve(null, null, null, null)); new TimelineDataToRetrieve(null, null, null, null, null, null));
Assert.assertEquals( Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_1")).toString(), (new TimelineEntity.Identifier("app", "id_1")).toString(),
result.getIdentifier().toString()); result.getIdentifier().toString());
@ -351,7 +351,7 @@ public void testAppFlowMappingCsv() throws Exception {
TimelineEntity result = reader.getEntity( TimelineEntity result = reader.getEntity(
new TimelineReaderContext("cluster1", null, null, null, "app2", new TimelineReaderContext("cluster1", null, null, null, "app2",
"app", "id_5"), "app", "id_5"),
new TimelineDataToRetrieve(null, null, null, null)); new TimelineDataToRetrieve(null, null, null, null, null, null));
Assert.assertEquals( Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_5")).toString(), (new TimelineEntity.Identifier("app", "id_5")).toString(),
result.getIdentifier().toString()); result.getIdentifier().toString());
@ -365,7 +365,8 @@ public void testGetEntityCustomFields() throws Exception {
new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
"app", "id_1"), "app", "id_1"),
new TimelineDataToRetrieve(null, null, new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS), null)); EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS), null, null,
null));
Assert.assertEquals( Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_1")).toString(), (new TimelineEntity.Identifier("app", "id_1")).toString(),
result.getIdentifier().toString()); result.getIdentifier().toString());
@ -383,7 +384,8 @@ public void testGetEntityAllFields() throws Exception {
TimelineEntity result = reader.getEntity( TimelineEntity result = reader.getEntity(
new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
"app", "id_1"), "app", "id_1"),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
Assert.assertEquals( Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_1")).toString(), (new TimelineEntity.Identifier("app", "id_1")).toString(),
result.getIdentifier().toString()); result.getIdentifier().toString());
@ -399,7 +401,8 @@ public void testGetAllEntities() throws Exception {
Set<TimelineEntity> result = reader.getEntities( Set<TimelineEntity> result = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
"app", null), new TimelineEntityFilters.Builder().build(), "app", null), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
// All 4 entities will be returned // All 4 entities will be returned
Assert.assertEquals(4, result.size()); Assert.assertEquals(4, result.size());
} }