YARN-4455. Support fetching metrics by time range. Contributed by Varun Saxena.

This commit is contained in:
Rohith Sharma K S 2017-07-20 12:16:06 +05:30 committed by Varun Saxena
parent 660413165a
commit 70078e91e3
13 changed files with 777 additions and 176 deletions

View File

@ -328,7 +328,7 @@ public class TestTimelineReaderWebServicesHBaseStorage
userEntity.setType("entitytype"); userEntity.setType("entitytype");
userEntity.setId("entityid-" + i); userEntity.setId("entityid-" + i);
userEntity.setIdPrefix(11 - i); userEntity.setIdPrefix(11 - i);
userEntity.setCreatedTime(System.currentTimeMillis()); userEntity.setCreatedTime(ts);
userEntities.addEntity(userEntity); userEntities.addEntity(userEntity);
} }
@ -344,7 +344,7 @@ public class TestTimelineReaderWebServicesHBaseStorage
flowVersion2, runid2, entity3.getId(), te3); flowVersion2, runid2, entity3.getId(), te3);
hbi.write(cluster, user, flow, flowVersion, runid, hbi.write(cluster, user, flow, flowVersion, runid,
"application_1111111111_1111", userEntities); "application_1111111111_1111", userEntities);
writeApplicationEntities(hbi); writeApplicationEntities(hbi, ts);
hbi.flush(); hbi.flush();
} finally { } finally {
if (hbi != null) { if (hbi != null) {
@ -353,26 +353,25 @@ public class TestTimelineReaderWebServicesHBaseStorage
} }
} }
static void writeApplicationEntities(HBaseTimelineWriterImpl hbi) static void writeApplicationEntities(HBaseTimelineWriterImpl hbi,
throws IOException { long timestamp) throws IOException {
long currentTimeMillis = System.currentTimeMillis();
int count = 1; int count = 1;
for (long i = 1; i <= 3; i++) { for (long i = 1; i <= 3; i++) {
for (int j = 1; j <= 5; j++) { for (int j = 1; j <= 5; j++) {
TimelineEntities te = new TimelineEntities(); TimelineEntities te = new TimelineEntities();
ApplicationId appId = ApplicationId appId =
BuilderUtils.newApplicationId(currentTimeMillis, count++); BuilderUtils.newApplicationId(timestamp, count++);
ApplicationEntity appEntity = new ApplicationEntity(); ApplicationEntity appEntity = new ApplicationEntity();
appEntity.setId(appId.toString()); appEntity.setId(appId.toString());
appEntity.setCreatedTime(currentTimeMillis); appEntity.setCreatedTime(timestamp);
TimelineEvent created = new TimelineEvent(); TimelineEvent created = new TimelineEvent();
created.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); created.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
created.setTimestamp(currentTimeMillis); created.setTimestamp(timestamp);
appEntity.addEvent(created); appEntity.addEvent(created);
TimelineEvent finished = new TimelineEvent(); TimelineEvent finished = new TimelineEvent();
finished.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); finished.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
finished.setTimestamp(currentTimeMillis + i * j); finished.setTimestamp(timestamp + i * j);
appEntity.addEvent(finished); appEntity.addEvent(finished);
te.addEntity(appEntity); te.addEntity(appEntity);
@ -1775,6 +1774,113 @@ public class TestTimelineReaderWebServicesHBaseStorage
} }
} }
private static void verifyMetricCount(TimelineEntity entity,
int expectedMetricsCnt, int expectedMeticsValCnt) {
int metricsValCnt = 0;
for (TimelineMetric m : entity.getMetrics()) {
metricsValCnt += m.getValues().size();
}
assertEquals(expectedMetricsCnt, entity.getMetrics().size());
assertEquals(expectedMeticsValCnt, metricsValCnt);
}
private static void verifyMetricsCount(Set<TimelineEntity> entities,
int expectedMetricsCnt, int expectedMeticsValCnt) {
int metricsCnt = 0;
int metricsValCnt = 0;
for (TimelineEntity entity : entities) {
metricsCnt += entity.getMetrics().size();
for (TimelineMetric m : entity.getMetrics()) {
metricsValCnt += m.getValues().size();
}
}
assertEquals(expectedMetricsCnt, metricsCnt);
assertEquals(expectedMeticsValCnt, metricsValCnt);
}
@Test
public void testGetEntitiesMetricsTimeRange() throws Exception {
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 90000) + "&metricstimeend=" + (ts - 80000));
ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 4, 4);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 100000) + "&metricstimeend=" + (ts - 80000));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 5, 9);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 100000));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 5, 9);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?fields=ALL&metricslimit=100&metricstimeend=" +
(ts - 90000));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 5, 5);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?fields=ALL&metricstimestart=" +
(ts - 100000));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 5, 5);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1/entity2?fields=ALL&metricstimestart=" +
(ts - 100000) + "&metricstimeend=" + (ts - 80000));
resp = getResponse(client, uri);
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
assertNotNull(entity);
verifyMetricCount(entity, 3, 3);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1/entity2?fields=ALL&metricslimit=5&metricstimestart=" +
(ts - 100000) + "&metricstimeend=" + (ts - 80000));
resp = getResponse(client, uri);
entity = resp.getEntity(TimelineEntity.class);
assertNotNull(entity);
verifyMetricCount(entity, 3, 5);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/apps/application_1111111111_1111/" +
"entities/type1?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 80000) + "&metricstimeend=" + (ts - 90000));
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
} finally {
client.destroy();
}
}
/** /**
* Tests if specific configs and metrics are retrieve for getEntity call. * Tests if specific configs and metrics are retrieve for getEntity call.
*/ */
@ -2378,4 +2484,87 @@ public class TestTimelineReaderWebServicesHBaseStorage
client.destroy(); client.destroy();
} }
} }
@Test
public void testGetAppsMetricsRange() throws Exception {
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
"1002345678919/apps?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 200000) + "&metricstimeend=" + (ts - 100000));
ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 4, 4);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
"1002345678919/apps?fields=ALL&metricslimit=100");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 4, 10);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/" +
"apps?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 200000) + "&metricstimeend=" + (ts - 100000));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(3, entities.size());
verifyMetricsCount(entities, 5, 5);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/" +
"apps?fields=ALL&metricslimit=100");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(3, entities.size());
verifyMetricsCount(entities, 5, 12);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
"1002345678919/apps?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 200000));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 4, 10);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
"1002345678919/apps?fields=ALL&metricslimit=100&metricstimeend=" +
(ts - 100000));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
verifyMetricsCount(entities, 4, 4);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/apps/application_1111111111_1111?userid=user1&fields=ALL" +
"&flowname=flow_name&flowrunid=1002345678919&metricslimit=100" +
"&metricstimestart=" +(ts - 200000) + "&metricstimeend=" +
(ts - 100000));
resp = getResponse(client, uri);
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
assertNotNull(entity);
verifyMetricCount(entity, 3, 3);
uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" +
"timeline/clusters/cluster1/users/user1/flows/flow_name/" +
"apps?fields=ALL&metricslimit=100&metricstimestart=" +
(ts - 100000) + "&metricstimeend=" + (ts - 200000));
verifyHttpResponse(client, uri, Status.BAD_REQUEST);
} finally {
client.destroy();
}
}
} }

View File

@ -58,7 +58,8 @@ public final class DataGeneratorForTest {
TimelineSchemaCreator.createAllTables(conf, false); TimelineSchemaCreator.createAllTables(conf, false);
} }
public static void loadApps(HBaseTestingUtility util) throws IOException { public static void loadApps(HBaseTestingUtility util, long ts)
throws IOException {
TimelineEntities te = new TimelineEntities(); TimelineEntities te = new TimelineEntities();
TimelineEntity entity = new TimelineEntity(); TimelineEntity entity = new TimelineEntity();
String id = "application_1111111111_2222"; String id = "application_1111111111_2222";
@ -92,7 +93,6 @@ public final class DataGeneratorForTest {
entity.addConfigs(conf); entity.addConfigs(conf);
// add metrics // add metrics
Set<TimelineMetric> metrics = new HashSet<>(); Set<TimelineMetric> metrics = new HashSet<>();
long ts = System.currentTimeMillis();
metrics.add(getMetric4(ts)); metrics.add(getMetric4(ts));
TimelineMetric m12 = new TimelineMetric(); TimelineMetric m12 = new TimelineMetric();
@ -137,7 +137,7 @@ public final class DataGeneratorForTest {
entity1.addConfigs(conf1); entity1.addConfigs(conf1);
// add metrics // add metrics
entity1.addMetrics(getMetrics4()); entity1.addMetrics(getMetrics4(ts));
TimelineEvent event11 = new TimelineEvent(); TimelineEvent event11 = new TimelineEvent();
event11.setId("end_event"); event11.setId("end_event");
event11.setTimestamp(ts); event11.setTimestamp(ts);
@ -175,18 +175,17 @@ public final class DataGeneratorForTest {
} }
} }
private static Set<TimelineMetric> getMetrics4() { private static Set<TimelineMetric> getMetrics4(long ts) {
Set<TimelineMetric> metrics1 = new HashSet<>(); Set<TimelineMetric> metrics1 = new HashSet<>();
TimelineMetric m2 = new TimelineMetric(); TimelineMetric m2 = new TimelineMetric();
m2.setId("MAP1_SLOT_MILLIS"); m2.setId("MAP1_SLOT_MILLIS");
long ts1 = System.currentTimeMillis();
Map<Long, Number> metricValues1 = new HashMap<>(); Map<Long, Number> metricValues1 = new HashMap<>();
metricValues1.put(ts1 - 120000, 100000000); metricValues1.put(ts - 120000, 100000000);
metricValues1.put(ts1 - 100000, 200000000); metricValues1.put(ts - 100000, 200000000);
metricValues1.put(ts1 - 80000, 300000000); metricValues1.put(ts - 80000, 300000000);
metricValues1.put(ts1 - 60000, 400000000); metricValues1.put(ts - 60000, 400000000);
metricValues1.put(ts1 - 40000, 50000000000L); metricValues1.put(ts - 40000, 50000000000L);
metricValues1.put(ts1 - 20000, 60000000000L); metricValues1.put(ts - 20000, 60000000000L);
m2.setType(Type.TIME_SERIES); m2.setType(Type.TIME_SERIES);
m2.setValues(metricValues1); m2.setValues(metricValues1);
metrics1.add(m2); metrics1.add(m2);
@ -307,7 +306,7 @@ public final class DataGeneratorForTest {
return metricValues; return metricValues;
} }
public static void loadEntities(HBaseTestingUtility util) public static void loadEntities(HBaseTestingUtility util, long ts)
throws IOException { throws IOException {
TimelineEntities te = new TimelineEntities(); TimelineEntities te = new TimelineEntities();
TimelineEntity entity = new TimelineEntity(); TimelineEntity entity = new TimelineEntity();
@ -332,7 +331,6 @@ public final class DataGeneratorForTest {
Set<TimelineMetric> metrics = new HashSet<>(); Set<TimelineMetric> metrics = new HashSet<>();
TimelineMetric m1 = new TimelineMetric(); TimelineMetric m1 = new TimelineMetric();
m1.setId("MAP_SLOT_MILLIS"); m1.setId("MAP_SLOT_MILLIS");
long ts = System.currentTimeMillis();
m1.setType(Type.TIME_SERIES); m1.setType(Type.TIME_SERIES);
m1.setValues(getMetricValues1(ts)); m1.setValues(getMetricValues1(ts));
metrics.add(m1); metrics.add(m1);
@ -383,9 +381,8 @@ public final class DataGeneratorForTest {
Set<TimelineMetric> metrics1 = new HashSet<>(); Set<TimelineMetric> metrics1 = new HashSet<>();
TimelineMetric m2 = new TimelineMetric(); TimelineMetric m2 = new TimelineMetric();
m2.setId("MAP1_SLOT_MILLIS"); m2.setId("MAP1_SLOT_MILLIS");
long ts1 = System.currentTimeMillis();
m2.setType(Type.TIME_SERIES); m2.setType(Type.TIME_SERIES);
m2.setValues(getMetricValues2(ts1)); m2.setValues(getMetricValues2(ts));
metrics1.add(m2); metrics1.add(m2);
entity1.addMetrics(metrics1); entity1.addMetrics(metrics1);
te.addEntity(entity1); te.addEntity(entity1);

View File

@ -87,13 +87,14 @@ public class TestHBaseTimelineStorageApps {
private static HBaseTestingUtility util; private static HBaseTestingUtility util;
private HBaseTimelineReaderImpl reader; private HBaseTimelineReaderImpl reader;
private static final long CURRENT_TIME = System.currentTimeMillis();
@BeforeClass @BeforeClass
public static void setupBeforeClass() throws Exception { public static void setupBeforeClass() throws Exception {
util = new HBaseTestingUtility(); util = new HBaseTestingUtility();
util.startMiniCluster(); util.startMiniCluster();
DataGeneratorForTest.createSchema(util.getConfiguration()); DataGeneratorForTest.createSchema(util.getConfiguration());
DataGeneratorForTest.loadApps(util); DataGeneratorForTest.loadApps(util, CURRENT_TIME);
} }
@Before @Before
@ -236,13 +237,12 @@ public class TestHBaseTimelineStorageApps {
TimelineMetric m1 = new TimelineMetric(); TimelineMetric m1 = new TimelineMetric();
m1.setId("MAP_SLOT_MILLIS"); m1.setId("MAP_SLOT_MILLIS");
Map<Long, Number> metricValues = new HashMap<Long, Number>(); Map<Long, Number> metricValues = new HashMap<Long, Number>();
long ts = System.currentTimeMillis(); metricValues.put(CURRENT_TIME - 120000, 100000000);
metricValues.put(ts - 120000, 100000000); metricValues.put(CURRENT_TIME - 100000, 200000000);
metricValues.put(ts - 100000, 200000000); metricValues.put(CURRENT_TIME - 80000, 300000000);
metricValues.put(ts - 80000, 300000000); metricValues.put(CURRENT_TIME - 60000, 400000000);
metricValues.put(ts - 60000, 400000000); metricValues.put(CURRENT_TIME - 40000, 50000000000L);
metricValues.put(ts - 40000, 50000000000L); metricValues.put(CURRENT_TIME - 20000, 60000000000L);
metricValues.put(ts - 20000, 60000000000L);
m1.setType(Type.TIME_SERIES); m1.setType(Type.TIME_SERIES);
m1.setValues(metricValues); m1.setValues(metricValues);
metrics.add(m1); metrics.add(m1);
@ -259,7 +259,7 @@ public class TestHBaseTimelineStorageApps {
TimelineMetric aggMetric = new TimelineMetric(); TimelineMetric aggMetric = new TimelineMetric();
aggMetric.setId("MEM_USAGE"); aggMetric.setId("MEM_USAGE");
Map<Long, Number> aggMetricValues = new HashMap<Long, Number>(); Map<Long, Number> aggMetricValues = new HashMap<Long, Number>();
long aggTs = ts; long aggTs = CURRENT_TIME;
aggMetricValues.put(aggTs - 120000, 102400000L); aggMetricValues.put(aggTs - 120000, 102400000L);
aggMetric.setType(Type.SINGLE_VALUE); aggMetric.setType(Type.SINGLE_VALUE);
aggMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM); aggMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
@ -380,7 +380,7 @@ public class TestHBaseTimelineStorageApps {
new TimelineReaderContext(cluster, user, flow, runid, appId, new TimelineReaderContext(cluster, user, flow, runid, appId,
entity.getType(), entity.getId()), entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, new TimelineDataToRetrieve(null, null,
EnumSet.of(TimelineReader.Field.ALL), Integer.MAX_VALUE)); EnumSet.of(TimelineReader.Field.ALL), Integer.MAX_VALUE, null, null));
assertNotNull(e1); assertNotNull(e1);
// verify attributes // verify attributes
@ -423,7 +423,7 @@ public class TestHBaseTimelineStorageApps {
e1 = reader.getEntity(new TimelineReaderContext(cluster, user, flow, e1 = reader.getEntity(new TimelineReaderContext(cluster, user, flow,
runid, appId, entity.getType(), entity.getId()), runid, appId, entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, new TimelineDataToRetrieve(null, null,
EnumSet.of(TimelineReader.Field.ALL), 3)); EnumSet.of(TimelineReader.Field.ALL), 3, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(appId, e1.getId()); assertEquals(appId, e1.getId());
assertEquals(TimelineEntityType.YARN_APPLICATION.toString(), assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
@ -444,7 +444,7 @@ public class TestHBaseTimelineStorageApps {
e1 = reader.getEntity( e1 = reader.getEntity(
new TimelineReaderContext(cluster, user, flow, runid, appId, new TimelineReaderContext(cluster, user, flow, runid, appId,
entity.getType(), entity.getId()), new TimelineDataToRetrieve( entity.getType(), entity.getId()), new TimelineDataToRetrieve(
null, null, EnumSet.of(TimelineReader.Field.ALL), null)); null, null, EnumSet.of(TimelineReader.Field.ALL), null, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(appId, e1.getId()); assertEquals(appId, e1.getId());
assertEquals(TimelineEntityType.YARN_APPLICATION.toString(), assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
@ -465,9 +465,9 @@ public class TestHBaseTimelineStorageApps {
metric.getId().equals("MEM_USAGE")); metric.getId().equals("MEM_USAGE"));
assertEquals(1, metric.getValues().size()); assertEquals(1, metric.getValues().size());
if (metric.getId().equals("MAP_SLOT_MILLIS")) { if (metric.getId().equals("MAP_SLOT_MILLIS")) {
assertTrue(metric.getValues().containsKey(ts - 20000)); assertTrue(metric.getValues().containsKey(CURRENT_TIME - 20000));
assertEquals(metricValues.get(ts - 20000), assertEquals(metricValues.get(CURRENT_TIME - 20000),
metric.getValues().get(ts - 20000)); metric.getValues().get(CURRENT_TIME - 20000));
} }
if (metric.getId().equals("MEM_USAGE")) { if (metric.getId().equals("MEM_USAGE")) {
assertTrue(metric.getValues().containsKey(aggTs - 120000)); assertTrue(metric.getValues().containsKey(aggTs - 120000));
@ -554,11 +554,13 @@ public class TestHBaseTimelineStorageApps {
TimelineEntity e1 = reader.getEntity( TimelineEntity e1 = reader.getEntity(
new TimelineReaderContext(cluster, user, flow, runid, appName, new TimelineReaderContext(cluster, user, flow, runid, appName,
entity.getType(), entity.getId()), entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
TimelineEntity e2 = reader.getEntity( TimelineEntity e2 = reader.getEntity(
new TimelineReaderContext(cluster, user, null, null, appName, new TimelineReaderContext(cluster, user, null, null, appName,
entity.getType(), entity.getId()), entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertNotNull(e1); assertNotNull(e1);
assertNotNull(e2); assertNotNull(e2);
assertEquals(e1, e2); assertEquals(e1, e2);
@ -652,7 +654,8 @@ public class TestHBaseTimelineStorageApps {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1111111111_2222", 1002345678919L, "application_1111111111_2222",
TimelineEntityType.YARN_APPLICATION.toString(), null), TimelineEntityType.YARN_APPLICATION.toString(), null),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertNotNull(entity); assertNotNull(entity);
assertEquals(3, entity.getConfigs().size()); assertEquals(3, entity.getConfigs().size());
assertEquals(1, entity.getIsRelatedToEntities().size()); assertEquals(1, entity.getIsRelatedToEntities().size());
@ -661,7 +664,8 @@ public class TestHBaseTimelineStorageApps {
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(), 1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(3, entities.size()); assertEquals(3, entities.size());
int cfgCnt = 0; int cfgCnt = 0;
int metricCnt = 0; int metricCnt = 0;
@ -775,17 +779,17 @@ public class TestHBaseTimelineStorageApps {
1002345678919L, "application_1111111111_2222", 1002345678919L, "application_1111111111_2222",
TimelineEntityType.YARN_APPLICATION.toString(), null), TimelineEntityType.YARN_APPLICATION.toString(), null),
new TimelineDataToRetrieve( new TimelineDataToRetrieve(
null, null, EnumSet.of(Field.INFO, Field.CONFIGS), null)); null, null, EnumSet.of(Field.INFO, Field.CONFIGS), null, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(3, e1.getConfigs().size()); assertEquals(3, e1.getConfigs().size());
assertEquals(0, e1.getIsRelatedToEntities().size()); assertEquals(0, e1.getIsRelatedToEntities().size());
Set<TimelineEntity> es1 = reader.getEntities( Set<TimelineEntity> es1 = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(), 1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), null), new TimelineEntityFilters.Builder().build(),
new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve( new TimelineDataToRetrieve(
null, null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS), null)); null, null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS), null,
null, null));
assertEquals(3, es1.size()); assertEquals(3, es1.size());
int metricsCnt = 0; int metricsCnt = 0;
int isRelatedToCnt = 0; int isRelatedToCnt = 0;
@ -814,7 +818,8 @@ public class TestHBaseTimelineStorageApps {
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(), 1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), null),
new TimelineEntityFilters.Builder().isRelatedTo(irt).build(), new TimelineEntityFilters.Builder().isRelatedTo(irt).build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int isRelatedToCnt = 0; int isRelatedToCnt = 0;
for (TimelineEntity timelineEntity : entities) { for (TimelineEntity timelineEntity : entities) {
@ -966,7 +971,8 @@ public class TestHBaseTimelineStorageApps {
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(), 1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), null),
new TimelineEntityFilters.Builder().relatesTo(rt).build(), new TimelineEntityFilters.Builder().relatesTo(rt).build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int relatesToCnt = 0; int relatesToCnt = 0;
for (TimelineEntity timelineEntity : entities) { for (TimelineEntity timelineEntity : entities) {
@ -1204,7 +1210,7 @@ public class TestHBaseTimelineStorageApps {
new TimelineEntityFilters.Builder().configFilters(confFilterList) new TimelineEntityFilters.Builder().configFilters(confFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int cfgCnt = 0; int cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1218,7 +1224,8 @@ public class TestHBaseTimelineStorageApps {
null), null),
new TimelineEntityFilters.Builder().configFilters(confFilterList) new TimelineEntityFilters.Builder().configFilters(confFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
cfgCnt = 0; cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1236,7 +1243,7 @@ public class TestHBaseTimelineStorageApps {
new TimelineEntityFilters.Builder().configFilters(confFilterList1) new TimelineEntityFilters.Builder().configFilters(confFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
cfgCnt = 0; cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1256,7 +1263,7 @@ public class TestHBaseTimelineStorageApps {
new TimelineEntityFilters.Builder().configFilters(confFilterList2) new TimelineEntityFilters.Builder().configFilters(confFilterList2)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList confFilterList3 = new TimelineFilterList( TimelineFilterList confFilterList3 = new TimelineFilterList(
@ -1269,7 +1276,7 @@ public class TestHBaseTimelineStorageApps {
new TimelineEntityFilters.Builder().configFilters(confFilterList3) new TimelineEntityFilters.Builder().configFilters(confFilterList3)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList confFilterList4 = new TimelineFilterList( TimelineFilterList confFilterList4 = new TimelineFilterList(
@ -1282,7 +1289,7 @@ public class TestHBaseTimelineStorageApps {
new TimelineEntityFilters.Builder().configFilters(confFilterList4) new TimelineEntityFilters.Builder().configFilters(confFilterList4)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList confFilterList5 = new TimelineFilterList( TimelineFilterList confFilterList5 = new TimelineFilterList(
@ -1295,7 +1302,7 @@ public class TestHBaseTimelineStorageApps {
new TimelineEntityFilters.Builder().configFilters(confFilterList5) new TimelineEntityFilters.Builder().configFilters(confFilterList5)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(3, entities.size()); assertEquals(3, entities.size());
} }
@ -1311,7 +1318,8 @@ public class TestHBaseTimelineStorageApps {
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(), 1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), null),
new TimelineEntityFilters.Builder().eventFilters(ef).build(), new TimelineEntityFilters.Builder().eventFilters(ef).build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
int eventCnt = 0; int eventCnt = 0;
for (TimelineEntity timelineEntity : entities) { for (TimelineEntity timelineEntity : entities) {
@ -1433,7 +1441,7 @@ public class TestHBaseTimelineStorageApps {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1111111111_2222", 1002345678919L, "application_1111111111_2222",
TimelineEntityType.YARN_APPLICATION.toString(), null), TimelineEntityType.YARN_APPLICATION.toString(), null),
new TimelineDataToRetrieve(list, null, null, null)); new TimelineDataToRetrieve(list, null, null, null, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(1, e1.getConfigs().size()); assertEquals(1, e1.getConfigs().size());
Set<TimelineEntity> es1 = reader.getEntities( Set<TimelineEntity> es1 = reader.getEntities(
@ -1441,7 +1449,7 @@ public class TestHBaseTimelineStorageApps {
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(), 1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null) , null) ,
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(list, null, null, null)); new TimelineDataToRetrieve(list, null, null, null, null, null));
int cfgCnt = 0; int cfgCnt = 0;
for (TimelineEntity entity : es1) { for (TimelineEntity entity : es1) {
cfgCnt += entity.getConfigs().size(); cfgCnt += entity.getConfigs().size();
@ -1467,7 +1475,7 @@ public class TestHBaseTimelineStorageApps {
null), null),
new TimelineEntityFilters.Builder().configFilters(confFilterList) new TimelineEntityFilters.Builder().configFilters(confFilterList)
.build(), .build(),
new TimelineDataToRetrieve(list, null, null, null)); new TimelineDataToRetrieve(list, null, null, null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
int cfgCnt = 0; int cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1500,7 +1508,8 @@ public class TestHBaseTimelineStorageApps {
null), null),
new TimelineEntityFilters.Builder().configFilters(confFilterList1) new TimelineEntityFilters.Builder().configFilters(confFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(confsToRetrieve, null, null, null)); new TimelineDataToRetrieve(confsToRetrieve, null, null, null, null,
null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
cfgCnt = 0; cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1532,7 +1541,7 @@ public class TestHBaseTimelineStorageApps {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList) new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int metricCnt = 0; int metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1546,7 +1555,8 @@ public class TestHBaseTimelineStorageApps {
null), null),
new TimelineEntityFilters.Builder().metricFilters(metricFilterList) new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
metricCnt = 0; metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1566,7 +1576,7 @@ public class TestHBaseTimelineStorageApps {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList1) new TimelineEntityFilters.Builder().metricFilters(metricFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
metricCnt = 0; metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1586,7 +1596,7 @@ public class TestHBaseTimelineStorageApps {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList2) new TimelineEntityFilters.Builder().metricFilters(metricFilterList2)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList metricFilterList3 = new TimelineFilterList( TimelineFilterList metricFilterList3 = new TimelineFilterList(
@ -1599,7 +1609,7 @@ public class TestHBaseTimelineStorageApps {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList3) new TimelineEntityFilters.Builder().metricFilters(metricFilterList3)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList metricFilterList4 = new TimelineFilterList( TimelineFilterList metricFilterList4 = new TimelineFilterList(
@ -1612,7 +1622,7 @@ public class TestHBaseTimelineStorageApps {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList4) new TimelineEntityFilters.Builder().metricFilters(metricFilterList4)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList metricFilterList5 = new TimelineFilterList( TimelineFilterList metricFilterList5 = new TimelineFilterList(
@ -1625,7 +1635,7 @@ public class TestHBaseTimelineStorageApps {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList5) new TimelineEntityFilters.Builder().metricFilters(metricFilterList5)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(3, entities.size()); assertEquals(3, entities.size());
} }
@ -1638,7 +1648,7 @@ public class TestHBaseTimelineStorageApps {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1111111111_2222", 1002345678919L, "application_1111111111_2222",
TimelineEntityType.YARN_APPLICATION.toString(), null), TimelineEntityType.YARN_APPLICATION.toString(), null),
new TimelineDataToRetrieve(null, list, null, null)); new TimelineDataToRetrieve(null, list, null, null, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(1, e1.getMetrics().size()); assertEquals(1, e1.getMetrics().size());
Set<TimelineEntity> es1 = reader.getEntities( Set<TimelineEntity> es1 = reader.getEntities(
@ -1646,7 +1656,7 @@ public class TestHBaseTimelineStorageApps {
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(), 1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, list, null, null)); new TimelineDataToRetrieve(null, list, null, null, null, null));
int metricCnt = 0; int metricCnt = 0;
for (TimelineEntity entity : es1) { for (TimelineEntity entity : es1) {
metricCnt += entity.getMetrics().size(); metricCnt += entity.getMetrics().size();
@ -1672,7 +1682,7 @@ public class TestHBaseTimelineStorageApps {
null), null),
new TimelineEntityFilters.Builder().metricFilters(metricFilterList) new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, list, null, null)); new TimelineDataToRetrieve(null, list, null, null, null, null));
int metricCnt = 0; int metricCnt = 0;
assertEquals(1, entities.size()); assertEquals(1, entities.size());
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1698,7 +1708,8 @@ public class TestHBaseTimelineStorageApps {
null), null),
new TimelineEntityFilters.Builder().metricFilters(metricFilterList1) new TimelineEntityFilters.Builder().metricFilters(metricFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, metricsToRetrieve, null, null)); new TimelineDataToRetrieve(null, metricsToRetrieve, null, null, null,
null));
metricCnt = 0; metricCnt = 0;
assertEquals(2, entities.size()); assertEquals(2, entities.size());
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1715,8 +1726,8 @@ public class TestHBaseTimelineStorageApps {
TimelineEntityType.YARN_APPLICATION.toString(), null), TimelineEntityType.YARN_APPLICATION.toString(), null),
new TimelineEntityFilters.Builder().metricFilters(metricFilterList1) new TimelineEntityFilters.Builder().metricFilters(metricFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, new TimelineDataToRetrieve(null, metricsToRetrieve,
metricsToRetrieve, EnumSet.of(Field.METRICS), Integer.MAX_VALUE)); EnumSet.of(Field.METRICS), Integer.MAX_VALUE, null, null));
metricCnt = 0; metricCnt = 0;
int metricValCnt = 0; int metricValCnt = 0;
assertEquals(2, entities.size()); assertEquals(2, entities.size());
@ -1732,6 +1743,86 @@ public class TestHBaseTimelineStorageApps {
assertEquals(7, metricValCnt); assertEquals(7, metricValCnt);
} }
@Test
public void testReadAppsMetricTimeRange() throws Exception {
Set<TimelineEntity> entities = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
100, null, null));
assertEquals(3, entities.size());
int metricTimeSeriesCnt = 0;
int metricCnt = 0;
for (TimelineEntity entity : entities) {
metricCnt += entity.getMetrics().size();
for (TimelineMetric m : entity.getMetrics()) {
metricTimeSeriesCnt += m.getValues().size();
}
}
assertEquals(3, metricCnt);
assertEquals(13, metricTimeSeriesCnt);
entities = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
100, CURRENT_TIME - 40000, CURRENT_TIME));
assertEquals(3, entities.size());
metricCnt = 0;
metricTimeSeriesCnt = 0;
for (TimelineEntity entity : entities) {
metricCnt += entity.getMetrics().size();
for (TimelineMetric m : entity.getMetrics()) {
for (Long ts : m.getValues().keySet()) {
assertTrue(ts >= CURRENT_TIME - 40000 && ts <= CURRENT_TIME);
}
metricTimeSeriesCnt += m.getValues().size();
}
}
assertEquals(3, metricCnt);
assertEquals(5, metricTimeSeriesCnt);
entities = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null, CURRENT_TIME - 40000, CURRENT_TIME));
assertEquals(3, entities.size());
metricCnt = 0;
metricTimeSeriesCnt = 0;
for (TimelineEntity entity : entities) {
metricCnt += entity.getMetrics().size();
for (TimelineMetric m : entity.getMetrics()) {
for (Long ts : m.getValues().keySet()) {
assertTrue(ts >= CURRENT_TIME - 40000 && ts <= CURRENT_TIME);
}
metricTimeSeriesCnt += m.getValues().size();
}
}
assertEquals(3, metricCnt);
assertEquals(3, metricTimeSeriesCnt);
TimelineEntity entity = reader.getEntity(new TimelineReaderContext(
"cluster1", "user1", "some_flow_name", 1002345678919L,
"application_1111111111_2222",
TimelineEntityType.YARN_APPLICATION.toString(), null),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), 100,
CURRENT_TIME - 40000, CURRENT_TIME));
assertNotNull(entity);
assertEquals(2, entity.getMetrics().size());
metricTimeSeriesCnt = 0;
for (TimelineMetric m : entity.getMetrics()) {
for (Long ts : m.getValues().keySet()) {
assertTrue(ts >= CURRENT_TIME - 40000 && ts <= CURRENT_TIME);
}
metricTimeSeriesCnt += m.getValues().size();
}
assertEquals(3, metricTimeSeriesCnt);
}
@Test @Test
public void testReadAppsInfoFilters() throws Exception { public void testReadAppsInfoFilters() throws Exception {
TimelineFilterList list1 = new TimelineFilterList(); TimelineFilterList list1 = new TimelineFilterList();
@ -1751,7 +1842,8 @@ public class TestHBaseTimelineStorageApps {
1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(), 1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
null), null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList).build(), new TimelineEntityFilters.Builder().infoFilters(infoFilterList).build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int infoCnt = 0; int infoCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1768,7 +1860,8 @@ public class TestHBaseTimelineStorageApps {
null), null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList1) new TimelineEntityFilters.Builder().infoFilters(infoFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
infoCnt = 0; infoCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1787,7 +1880,8 @@ public class TestHBaseTimelineStorageApps {
null), null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList2) new TimelineEntityFilters.Builder().infoFilters(infoFilterList2)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList infoFilterList3 = new TimelineFilterList( TimelineFilterList infoFilterList3 = new TimelineFilterList(
@ -1799,7 +1893,8 @@ public class TestHBaseTimelineStorageApps {
null), null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList3) new TimelineEntityFilters.Builder().infoFilters(infoFilterList3)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList infoFilterList4 = new TimelineFilterList( TimelineFilterList infoFilterList4 = new TimelineFilterList(
@ -1811,7 +1906,8 @@ public class TestHBaseTimelineStorageApps {
null), null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList4) new TimelineEntityFilters.Builder().infoFilters(infoFilterList4)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList infoFilterList5 = new TimelineFilterList( TimelineFilterList infoFilterList5 = new TimelineFilterList(
@ -1823,7 +1919,8 @@ public class TestHBaseTimelineStorageApps {
null), null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList5) new TimelineEntityFilters.Builder().infoFilters(infoFilterList5)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(3, entities.size()); assertEquals(3, entities.size());
} }

View File

@ -94,13 +94,14 @@ public class TestHBaseTimelineStorageEntities {
private static HBaseTestingUtility util; private static HBaseTestingUtility util;
private HBaseTimelineReaderImpl reader; private HBaseTimelineReaderImpl reader;
private static final long CURRENT_TIME = System.currentTimeMillis();
@BeforeClass @BeforeClass
public static void setupBeforeClass() throws Exception { public static void setupBeforeClass() throws Exception {
util = new HBaseTestingUtility(); util = new HBaseTestingUtility();
util.startMiniCluster(); util.startMiniCluster();
DataGeneratorForTest.createSchema(util.getConfiguration()); DataGeneratorForTest.createSchema(util.getConfiguration());
DataGeneratorForTest.loadEntities(util); DataGeneratorForTest.loadEntities(util, CURRENT_TIME);
} }
@Before @Before
@ -296,13 +297,13 @@ public class TestHBaseTimelineStorageEntities {
new TimelineReaderContext(cluster, user, flow, runid, appName, new TimelineReaderContext(cluster, user, flow, runid, appName,
entity.getType(), entity.getId()), entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL),
Integer.MAX_VALUE)); Integer.MAX_VALUE, null, null));
Set<TimelineEntity> es1 = reader.getEntities( Set<TimelineEntity> es1 = reader.getEntities(
new TimelineReaderContext(cluster, user, flow, runid, appName, new TimelineReaderContext(cluster, user, flow, runid, appName,
entity.getType(), null), entity.getType(), null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL),
Integer.MAX_VALUE)); Integer.MAX_VALUE, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(1, es1.size()); assertEquals(1, es1.size());
@ -333,7 +334,8 @@ public class TestHBaseTimelineStorageEntities {
e1 = reader.getEntity(new TimelineReaderContext(cluster, user, flow, e1 = reader.getEntity(new TimelineReaderContext(cluster, user, flow,
runid, appName, entity.getType(), entity.getId()), runid, appName, entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(id, e1.getId()); assertEquals(id, e1.getId());
assertEquals(type, e1.getType()); assertEquals(type, e1.getType());
@ -451,12 +453,14 @@ public class TestHBaseTimelineStorageEntities {
TimelineEntity e1 = reader.getEntity( TimelineEntity e1 = reader.getEntity(
new TimelineReaderContext(cluster, user, flow, runid, appName, new TimelineReaderContext(cluster, user, flow, runid, appName,
entity.getType(), entity.getId()), entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
Set<TimelineEntity> es1 = reader.getEntities( Set<TimelineEntity> es1 = reader.getEntities(
new TimelineReaderContext(cluster, user, flow, runid, appName, new TimelineReaderContext(cluster, user, flow, runid, appName,
entity.getType(), null), entity.getType(), null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(1, es1.size()); assertEquals(1, es1.size());
@ -517,7 +521,8 @@ public class TestHBaseTimelineStorageEntities {
TimelineEntity e1 = reader.getEntity( TimelineEntity e1 = reader.getEntity(
new TimelineReaderContext(cluster, user, flow, runid, appName, new TimelineReaderContext(cluster, user, flow, runid, appName,
entity.getType(), entity.getId()), entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertNotNull(e1); assertNotNull(e1);
// check the events // check the events
NavigableSet<TimelineEvent> events = e1.getEvents(); NavigableSet<TimelineEvent> events = e1.getEvents();
@ -546,7 +551,8 @@ public class TestHBaseTimelineStorageEntities {
TimelineEntity entity = reader.getEntity( TimelineEntity entity = reader.getEntity(
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", "hello"), 1002345678919L, "application_1231111111_1111", "world", "hello"),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertNotNull(entity); assertNotNull(entity);
assertEquals(3, entity.getConfigs().size()); assertEquals(3, entity.getConfigs().size());
assertEquals(1, entity.getIsRelatedToEntities().size()); assertEquals(1, entity.getIsRelatedToEntities().size());
@ -554,7 +560,8 @@ public class TestHBaseTimelineStorageEntities {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", 1002345678919L, "application_1231111111_1111", "world",
null), new TimelineEntityFilters.Builder().build(), null), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(3, entities.size()); assertEquals(3, entities.size());
int cfgCnt = 0; int cfgCnt = 0;
int metricCnt = 0; int metricCnt = 0;
@ -681,7 +688,8 @@ public class TestHBaseTimelineStorageEntities {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().eventFilters(ef).build(), new TimelineEntityFilters.Builder().eventFilters(ef).build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
int eventCnt = 0; int eventCnt = 0;
for (TimelineEntity timelineEntity : entities) { for (TimelineEntity timelineEntity : entities) {
@ -801,7 +809,8 @@ public class TestHBaseTimelineStorageEntities {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().isRelatedTo(irt).build(), new TimelineEntityFilters.Builder().isRelatedTo(irt).build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int isRelatedToCnt = 0; int isRelatedToCnt = 0;
for (TimelineEntity timelineEntity : entities) { for (TimelineEntity timelineEntity : entities) {
@ -943,7 +952,8 @@ public class TestHBaseTimelineStorageEntities {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().relatesTo(rt).build(), new TimelineEntityFilters.Builder().relatesTo(rt).build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int relatesToCnt = 0; int relatesToCnt = 0;
for (TimelineEntity timelineEntity : entities) { for (TimelineEntity timelineEntity : entities) {
@ -1138,7 +1148,7 @@ public class TestHBaseTimelineStorageEntities {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", "hello"), 1002345678919L, "application_1231111111_1111", "world", "hello"),
new TimelineDataToRetrieve( new TimelineDataToRetrieve(
null, null, EnumSet.of(Field.INFO, Field.CONFIGS), null)); null, null, EnumSet.of(Field.INFO, Field.CONFIGS), null, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(3, e1.getConfigs().size()); assertEquals(3, e1.getConfigs().size());
assertEquals(0, e1.getIsRelatedToEntities().size()); assertEquals(0, e1.getIsRelatedToEntities().size());
@ -1146,8 +1156,8 @@ public class TestHBaseTimelineStorageEntities {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve( new TimelineDataToRetrieve(null, null, EnumSet.of(Field.IS_RELATED_TO,
null, null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS), null)); Field.METRICS), null, null, null));
assertEquals(3, es1.size()); assertEquals(3, es1.size());
int metricsCnt = 0; int metricsCnt = 0;
int isRelatedToCnt = 0; int isRelatedToCnt = 0;
@ -1170,14 +1180,14 @@ public class TestHBaseTimelineStorageEntities {
TimelineEntity e1 = reader.getEntity( TimelineEntity e1 = reader.getEntity(
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", "hello"), 1002345678919L, "application_1231111111_1111", "world", "hello"),
new TimelineDataToRetrieve(list, null, null, null)); new TimelineDataToRetrieve(list, null, null, null, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(1, e1.getConfigs().size()); assertEquals(1, e1.getConfigs().size());
Set<TimelineEntity> es1 = reader.getEntities( Set<TimelineEntity> es1 = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(list, null, null, null)); new TimelineDataToRetrieve(list, null, null, null, null, null));
int cfgCnt = 0; int cfgCnt = 0;
for (TimelineEntity entity : es1) { for (TimelineEntity entity : es1) {
cfgCnt += entity.getConfigs().size(); cfgCnt += entity.getConfigs().size();
@ -1209,7 +1219,7 @@ public class TestHBaseTimelineStorageEntities {
new TimelineEntityFilters.Builder().configFilters(confFilterList) new TimelineEntityFilters.Builder().configFilters(confFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int cfgCnt = 0; int cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1222,7 +1232,8 @@ public class TestHBaseTimelineStorageEntities {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().configFilters(confFilterList) new TimelineEntityFilters.Builder().configFilters(confFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
cfgCnt = 0; cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1239,7 +1250,7 @@ public class TestHBaseTimelineStorageEntities {
new TimelineEntityFilters.Builder().configFilters(confFilterList1) new TimelineEntityFilters.Builder().configFilters(confFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
cfgCnt = 0; cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1258,7 +1269,7 @@ public class TestHBaseTimelineStorageEntities {
new TimelineEntityFilters.Builder().configFilters(confFilterList2) new TimelineEntityFilters.Builder().configFilters(confFilterList2)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList confFilterList3 = new TimelineFilterList( TimelineFilterList confFilterList3 = new TimelineFilterList(
@ -1270,7 +1281,7 @@ public class TestHBaseTimelineStorageEntities {
new TimelineEntityFilters.Builder().configFilters(confFilterList3) new TimelineEntityFilters.Builder().configFilters(confFilterList3)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList confFilterList4 = new TimelineFilterList( TimelineFilterList confFilterList4 = new TimelineFilterList(
@ -1282,7 +1293,7 @@ public class TestHBaseTimelineStorageEntities {
new TimelineEntityFilters.Builder().configFilters(confFilterList4) new TimelineEntityFilters.Builder().configFilters(confFilterList4)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList confFilterList5 = new TimelineFilterList( TimelineFilterList confFilterList5 = new TimelineFilterList(
@ -1294,7 +1305,7 @@ public class TestHBaseTimelineStorageEntities {
new TimelineEntityFilters.Builder().configFilters(confFilterList5) new TimelineEntityFilters.Builder().configFilters(confFilterList5)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
null)); null, null, null));
assertEquals(3, entities.size()); assertEquals(3, entities.size());
} }
@ -1311,7 +1322,7 @@ public class TestHBaseTimelineStorageEntities {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().configFilters(confFilterList) new TimelineEntityFilters.Builder().configFilters(confFilterList)
.build(), .build(),
new TimelineDataToRetrieve(list, null, null, null)); new TimelineDataToRetrieve(list, null, null, null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
int cfgCnt = 0; int cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1342,7 +1353,8 @@ public class TestHBaseTimelineStorageEntities {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().configFilters(confFilterList1) new TimelineEntityFilters.Builder().configFilters(confFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(confsToRetrieve, null, null, null)); new TimelineDataToRetrieve(confsToRetrieve, null, null, null, null,
null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
cfgCnt = 0; cfgCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1363,14 +1375,14 @@ public class TestHBaseTimelineStorageEntities {
TimelineEntity e1 = reader.getEntity( TimelineEntity e1 = reader.getEntity(
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", "hello"), 1002345678919L, "application_1231111111_1111", "world", "hello"),
new TimelineDataToRetrieve(null, list, null, null)); new TimelineDataToRetrieve(null, list, null, null, null, null));
assertNotNull(e1); assertNotNull(e1);
assertEquals(1, e1.getMetrics().size()); assertEquals(1, e1.getMetrics().size());
Set<TimelineEntity> es1 = reader.getEntities( Set<TimelineEntity> es1 = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, list, null, null)); new TimelineDataToRetrieve(null, list, null, null, null, null));
int metricCnt = 0; int metricCnt = 0;
for (TimelineEntity entity : es1) { for (TimelineEntity entity : es1) {
metricCnt += entity.getMetrics().size(); metricCnt += entity.getMetrics().size();
@ -1382,6 +1394,63 @@ public class TestHBaseTimelineStorageEntities {
assertEquals(2, metricCnt); assertEquals(2, metricCnt);
} }
@Test
public void testReadEntitiesMetricTimeRange() throws Exception {
Set<TimelineEntity> entities = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
100, null, null));
assertEquals(3, entities.size());
int metricTimeSeriesCnt = 0;
int metricCnt = 0;
for (TimelineEntity entity : entities) {
metricCnt += entity.getMetrics().size();
for (TimelineMetric m : entity.getMetrics()) {
metricTimeSeriesCnt += m.getValues().size();
}
}
assertEquals(3, metricCnt);
assertEquals(13, metricTimeSeriesCnt);
entities = reader.getEntities(new TimelineReaderContext("cluster1", "user1",
"some_flow_name", 1002345678919L, "application_1231111111_1111",
"world", null), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
100, CURRENT_TIME - 40000, CURRENT_TIME));
assertEquals(3, entities.size());
metricCnt = 0;
metricTimeSeriesCnt = 0;
for (TimelineEntity entity : entities) {
metricCnt += entity.getMetrics().size();
for (TimelineMetric m : entity.getMetrics()) {
for (Long ts : m.getValues().keySet()) {
assertTrue(ts >= CURRENT_TIME - 40000 && ts <= CURRENT_TIME);
}
metricTimeSeriesCnt += m.getValues().size();
}
}
assertEquals(3, metricCnt);
assertEquals(5, metricTimeSeriesCnt);
TimelineEntity entity = reader.getEntity(new TimelineReaderContext(
"cluster1", "user1", "some_flow_name", 1002345678919L,
"application_1231111111_1111", "world", "hello"),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), 100,
CURRENT_TIME - 40000, CURRENT_TIME));
assertNotNull(entity);
assertEquals(2, entity.getMetrics().size());
metricTimeSeriesCnt = 0;
for (TimelineMetric m : entity.getMetrics()) {
for (Long ts : m.getValues().keySet()) {
assertTrue(ts >= CURRENT_TIME - 40000 && ts <= CURRENT_TIME);
}
metricTimeSeriesCnt += m.getValues().size();
}
assertEquals(3, metricTimeSeriesCnt);
}
@Test @Test
public void testReadEntitiesMetricFilters() throws Exception { public void testReadEntitiesMetricFilters() throws Exception {
TimelineFilterList list1 = new TimelineFilterList(); TimelineFilterList list1 = new TimelineFilterList();
@ -1400,7 +1469,7 @@ public class TestHBaseTimelineStorageEntities {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList) new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int metricCnt = 0; int metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1413,7 +1482,8 @@ public class TestHBaseTimelineStorageEntities {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().metricFilters(metricFilterList) new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
metricCnt = 0; metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1432,7 +1502,7 @@ public class TestHBaseTimelineStorageEntities {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList1) new TimelineEntityFilters.Builder().metricFilters(metricFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
metricCnt = 0; metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1451,7 +1521,7 @@ public class TestHBaseTimelineStorageEntities {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList2) new TimelineEntityFilters.Builder().metricFilters(metricFilterList2)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList metricFilterList3 = new TimelineFilterList( TimelineFilterList metricFilterList3 = new TimelineFilterList(
@ -1463,7 +1533,7 @@ public class TestHBaseTimelineStorageEntities {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList3) new TimelineEntityFilters.Builder().metricFilters(metricFilterList3)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList metricFilterList4 = new TimelineFilterList( TimelineFilterList metricFilterList4 = new TimelineFilterList(
@ -1475,7 +1545,7 @@ public class TestHBaseTimelineStorageEntities {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList4) new TimelineEntityFilters.Builder().metricFilters(metricFilterList4)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList metricFilterList5 = new TimelineFilterList( TimelineFilterList metricFilterList5 = new TimelineFilterList(
@ -1487,7 +1557,7 @@ public class TestHBaseTimelineStorageEntities {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList5) new TimelineEntityFilters.Builder().metricFilters(metricFilterList5)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS), new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
null)); null, null, null));
assertEquals(3, entities.size()); assertEquals(3, entities.size());
} }
@ -1504,7 +1574,7 @@ public class TestHBaseTimelineStorageEntities {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().metricFilters(metricFilterList) new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, list, null, null)); new TimelineDataToRetrieve(null, list, null, null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
int metricCnt = 0; int metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1534,7 +1604,7 @@ public class TestHBaseTimelineStorageEntities {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList1) new TimelineEntityFilters.Builder().metricFilters(metricFilterList1)
.build(), .build(),
new TimelineDataToRetrieve( new TimelineDataToRetrieve(
null, metricsToRetrieve, EnumSet.of(Field.METRICS), null)); null, metricsToRetrieve, EnumSet.of(Field.METRICS), null, null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
metricCnt = 0; metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1553,8 +1623,8 @@ public class TestHBaseTimelineStorageEntities {
"world", null), "world", null),
new TimelineEntityFilters.Builder().metricFilters(metricFilterList1) new TimelineEntityFilters.Builder().metricFilters(metricFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, new TimelineDataToRetrieve(null, metricsToRetrieve,
metricsToRetrieve, EnumSet.of(Field.METRICS), Integer.MAX_VALUE)); EnumSet.of(Field.METRICS), Integer.MAX_VALUE, null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
metricCnt = 0; metricCnt = 0;
int metricValCnt = 0; int metricValCnt = 0;
@ -1588,7 +1658,8 @@ public class TestHBaseTimelineStorageEntities {
new TimelineReaderContext("cluster1", "user1", "some_flow_name", new TimelineReaderContext("cluster1", "user1", "some_flow_name",
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList).build(), new TimelineEntityFilters.Builder().infoFilters(infoFilterList).build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int infoCnt = 0; int infoCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1604,7 +1675,8 @@ public class TestHBaseTimelineStorageEntities {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList1) new TimelineEntityFilters.Builder().infoFilters(infoFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
infoCnt = 0; infoCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -1622,7 +1694,8 @@ public class TestHBaseTimelineStorageEntities {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList2) new TimelineEntityFilters.Builder().infoFilters(infoFilterList2)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList infoFilterList3 = new TimelineFilterList( TimelineFilterList infoFilterList3 = new TimelineFilterList(
@ -1633,7 +1706,8 @@ public class TestHBaseTimelineStorageEntities {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList3) new TimelineEntityFilters.Builder().infoFilters(infoFilterList3)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList infoFilterList4 = new TimelineFilterList( TimelineFilterList infoFilterList4 = new TimelineFilterList(
@ -1644,7 +1718,8 @@ public class TestHBaseTimelineStorageEntities {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList4) new TimelineEntityFilters.Builder().infoFilters(infoFilterList4)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList infoFilterList5 = new TimelineFilterList( TimelineFilterList infoFilterList5 = new TimelineFilterList(
@ -1655,7 +1730,8 @@ public class TestHBaseTimelineStorageEntities {
1002345678919L, "application_1231111111_1111", "world", null), 1002345678919L, "application_1231111111_1111", "world", null),
new TimelineEntityFilters.Builder().infoFilters(infoFilterList5) new TimelineEntityFilters.Builder().infoFilters(infoFilterList5)
.build(), .build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null,
null, null));
assertEquals(3, entities.size()); assertEquals(3, entities.size());
} }

View File

@ -584,7 +584,8 @@ public class TestHBaseStorageFlowRun {
TimelineEntity entity = hbr.getEntity( TimelineEntity entity = hbr.getEntity(
new TimelineReaderContext(cluster, user, flow, 1002345678919L, null, new TimelineReaderContext(cluster, user, flow, 1002345678919L, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null), TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineDataToRetrieve(null, metricsToRetrieve, null, null)); new TimelineDataToRetrieve(null, metricsToRetrieve, null, null, null,
null));
assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
Set<TimelineMetric> metrics = entity.getMetrics(); Set<TimelineMetric> metrics = entity.getMetrics();
assertEquals(1, metrics.size()); assertEquals(1, metrics.size());
@ -609,7 +610,8 @@ public class TestHBaseStorageFlowRun {
new TimelineReaderContext(cluster, user, flow, null, null, new TimelineReaderContext(cluster, user, flow, null, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null), TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, metricsToRetrieve, null, null)); new TimelineDataToRetrieve(null, metricsToRetrieve, null, null, null,
null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int metricCnt = 0; int metricCnt = 0;
for (TimelineEntity timelineEntity : entities) { for (TimelineEntity timelineEntity : entities) {
@ -681,7 +683,7 @@ public class TestHBaseStorageFlowRun {
TimelineEntityType.YARN_FLOW_RUN.toString(), null), TimelineEntityType.YARN_FLOW_RUN.toString(), null),
new TimelineEntityFilters.Builder().build(), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.METRICS), null)); EnumSet.of(Field.METRICS), null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
for (TimelineEntity timelineEntity : entities) { for (TimelineEntity timelineEntity : entities) {
Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics(); Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics();
@ -948,7 +950,7 @@ public class TestHBaseStorageFlowRun {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList) new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
.build(), .build(),
new TimelineDataToRetrieve(null, null, new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.METRICS), null)); EnumSet.of(Field.METRICS), null, null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
int metricCnt = 0; int metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -966,7 +968,7 @@ public class TestHBaseStorageFlowRun {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList1) new TimelineEntityFilters.Builder().metricFilters(metricFilterList1)
.build(), .build(),
new TimelineDataToRetrieve(null, null, new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.METRICS), null)); EnumSet.of(Field.METRICS), null, null, null));
assertEquals(1, entities.size()); assertEquals(1, entities.size());
metricCnt = 0; metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {
@ -983,7 +985,7 @@ public class TestHBaseStorageFlowRun {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList2) new TimelineEntityFilters.Builder().metricFilters(metricFilterList2)
.build(), .build(),
new TimelineDataToRetrieve(null, null, new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.METRICS), null)); EnumSet.of(Field.METRICS), null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList metricFilterList3 = new TimelineFilterList( TimelineFilterList metricFilterList3 = new TimelineFilterList(
@ -994,7 +996,7 @@ public class TestHBaseStorageFlowRun {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList3) new TimelineEntityFilters.Builder().metricFilters(metricFilterList3)
.build(), .build(),
new TimelineDataToRetrieve(null, null, new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.METRICS), null)); EnumSet.of(Field.METRICS), null, null, null));
assertEquals(0, entities.size()); assertEquals(0, entities.size());
TimelineFilterList list3 = new TimelineFilterList(); TimelineFilterList list3 = new TimelineFilterList();
@ -1016,7 +1018,7 @@ public class TestHBaseStorageFlowRun {
new TimelineEntityFilters.Builder().metricFilters(metricFilterList4) new TimelineEntityFilters.Builder().metricFilters(metricFilterList4)
.build(), .build(),
new TimelineDataToRetrieve(null, metricsToRetrieve, new TimelineDataToRetrieve(null, metricsToRetrieve,
EnumSet.of(Field.ALL), null)); EnumSet.of(Field.ALL), null, null, null));
assertEquals(2, entities.size()); assertEquals(2, entities.size());
metricCnt = 0; metricCnt = 0;
for (TimelineEntity entity : entities) { for (TimelineEntity entity : entities) {

View File

@ -192,7 +192,6 @@ public class ColumnHelper<T> {
NavigableMap<byte[], NavigableMap<Long, byte[]>> columnCellMap = NavigableMap<byte[], NavigableMap<Long, byte[]>> columnCellMap =
resultMap.get(columnFamilyBytes); resultMap.get(columnFamilyBytes);
// could be that there is no such column family. // could be that there is no such column family.
if (columnCellMap != null) { if (columnCellMap != null) {
for (Entry<byte[], NavigableMap<Long, byte[]>> entry : columnCellMap for (Entry<byte[], NavigableMap<Long, byte[]>> entry : columnCellMap

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Query;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -342,4 +343,20 @@ public final class HBaseTimelineStorageUtils {
return (obj instanceof Short) || (obj instanceof Integer) || return (obj instanceof Short) || (obj instanceof Integer) ||
(obj instanceof Long); (obj instanceof Long);
} }
public static void setMetricsTimeRange(Query query, byte[] metricsCf,
long tsBegin, long tsEnd) {
if (tsBegin != 0 || tsEnd != Long.MAX_VALUE) {
long supplementedTsBegin = tsBegin == 0 ? 0 :
TimestampGenerator.getSupplementedTimestamp(tsBegin, null);
long supplementedTsEnd =
(tsEnd == Long.MAX_VALUE) ? Long.MAX_VALUE :
TimestampGenerator.getSupplementedTimestamp(tsEnd + 1, null);
// Handle overflow by resetting time begin to 0 and time end to
// Long#MAX_VALUE, if required.
query.setColumnFamilyTimeRange(metricsCf,
((supplementedTsBegin < 0) ? 0 : supplementedTsBegin),
((supplementedTsEnd < 0) ? Long.MAX_VALUE : supplementedTsEnd));
}
}
} }

View File

@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Query;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
@ -315,6 +316,8 @@ class ApplicationEntityReader extends GenericEntityReader {
context.getFlowName(), context.getFlowRunId(), context.getAppId()); context.getFlowName(), context.getFlowRunId(), context.getAppId());
byte[] rowKey = applicationRowKey.getRowKey(); byte[] rowKey = applicationRowKey.getRowKey();
Get get = new Get(rowKey); Get get = new Get(rowKey);
// Set time range for metric values.
setMetricsTimeRange(get);
get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
if (filterList != null && !filterList.getFilters().isEmpty()) { if (filterList != null && !filterList.getFilters().isEmpty()) {
get.setFilter(filterList); get.setFilter(filterList);
@ -357,6 +360,14 @@ class ApplicationEntityReader extends GenericEntityReader {
} }
} }
private void setMetricsTimeRange(Query query) {
// Set time range for metric values.
HBaseTimelineStorageUtils.
setMetricsTimeRange(query, ApplicationColumnFamily.METRICS.getBytes(),
getDataToRetrieve().getMetricsTimeBegin(),
getDataToRetrieve().getMetricsTimeEnd());
}
@Override @Override
protected ResultScanner getResults(Configuration hbaseConf, protected ResultScanner getResults(Configuration hbaseConf,
Connection conn, FilterList filterList) throws IOException { Connection conn, FilterList filterList) throws IOException {
@ -405,6 +416,9 @@ class ApplicationEntityReader extends GenericEntityReader {
newList.addFilter(filterList); newList.addFilter(filterList);
} }
scan.setFilter(newList); scan.setFilter(newList);
// Set time range for metric values.
setMetricsTimeRange(scan);
scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
return getTable().getResultScanner(hbaseConf, conn, scan); return getTable().getResultScanner(hbaseConf, conn, scan);
} }

View File

@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Query;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
@ -434,8 +435,8 @@ class GenericEntityReader extends TimelineEntityReader {
context.getUserId(), context.getFlowName(), context.getFlowRunId(), context.getUserId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId(), context.getEntityType(), context.getAppId(), context.getEntityType(),
context.getEntityIdPrefix(), context.getEntityId()).getRowKey(); context.getEntityIdPrefix(), context.getEntityId()).getRowKey();
Get get = new Get(rowKey); Get get = new Get(rowKey);
setMetricsTimeRange(get);
get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
if (filterList != null && !filterList.getFilters().isEmpty()) { if (filterList != null && !filterList.getFilters().isEmpty()) {
get.setFilter(filterList); get.setFilter(filterList);
@ -468,6 +469,14 @@ class GenericEntityReader extends TimelineEntityReader {
return result; return result;
} }
private void setMetricsTimeRange(Query query) {
// Set time range for metric values.
HBaseTimelineStorageUtils.
setMetricsTimeRange(query, EntityColumnFamily.METRICS.getBytes(),
getDataToRetrieve().getMetricsTimeBegin(),
getDataToRetrieve().getMetricsTimeEnd());
}
@Override @Override
protected ResultScanner getResults(Configuration hbaseConf, Connection conn, protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
FilterList filterList) throws IOException { FilterList filterList) throws IOException {
@ -513,6 +522,7 @@ class GenericEntityReader extends TimelineEntityReader {
// mode. // mode.
filterList.addFilter(new PageFilter(getFilters().getLimit())); filterList.addFilter(new PageFilter(getFilters().getLimit()));
} }
setMetricsTimeRange(scan);
scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
if (filterList != null && !filterList.getFilters().isEmpty()) { if (filterList != null && !filterList.getFilters().isEmpty()) {
scan.setFilter(filterList); scan.setFilter(filterList);

View File

@ -57,6 +57,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Fiel
* metricsToRetrieve is specified, this limit defines an upper limit to the * metricsToRetrieve is specified, this limit defines an upper limit to the
* number of metrics to return. This parameter is ignored if METRICS are not to * number of metrics to return. This parameter is ignored if METRICS are not to
* be fetched.</li> * be fetched.</li>
* <li><b>metricsTimeStart</b> - Metric values before this timestamp would not
* be retrieved. If null or {@literal <0}, defaults to 0.</li>
* <li><b>metricsTimeEnd</b> - Metric values after this timestamp would not
* be retrieved. If null or {@literal <0}, defaults to {@link Long#MAX_VALUE}.
* </li>
* </ul> * </ul>
*/ */
@Private @Private
@ -66,6 +71,10 @@ public class TimelineDataToRetrieve {
private TimelineFilterList metricsToRetrieve; private TimelineFilterList metricsToRetrieve;
private EnumSet<Field> fieldsToRetrieve; private EnumSet<Field> fieldsToRetrieve;
private Integer metricsLimit; private Integer metricsLimit;
private Long metricsTimeBegin;
private Long metricsTimeEnd;
private static final long DEFAULT_METRICS_BEGIN_TIME = 0L;
private static final long DEFAULT_METRICS_END_TIME = Long.MAX_VALUE;
/** /**
* Default limit of number of metrics to return. * Default limit of number of metrics to return.
@ -73,12 +82,12 @@ public class TimelineDataToRetrieve {
public static final Integer DEFAULT_METRICS_LIMIT = 1; public static final Integer DEFAULT_METRICS_LIMIT = 1;
public TimelineDataToRetrieve() { public TimelineDataToRetrieve() {
this(null, null, null, null); this(null, null, null, null, null, null);
} }
public TimelineDataToRetrieve(TimelineFilterList confs, public TimelineDataToRetrieve(TimelineFilterList confs,
TimelineFilterList metrics, EnumSet<Field> fields, TimelineFilterList metrics, EnumSet<Field> fields,
Integer limitForMetrics) { Integer limitForMetrics, Long metricTimeBegin, Long metricTimeEnd) {
this.confsToRetrieve = confs; this.confsToRetrieve = confs;
this.metricsToRetrieve = metrics; this.metricsToRetrieve = metrics;
this.fieldsToRetrieve = fields; this.fieldsToRetrieve = fields;
@ -91,6 +100,20 @@ public class TimelineDataToRetrieve {
if (this.fieldsToRetrieve == null) { if (this.fieldsToRetrieve == null) {
this.fieldsToRetrieve = EnumSet.noneOf(Field.class); this.fieldsToRetrieve = EnumSet.noneOf(Field.class);
} }
if (metricTimeBegin == null || metricTimeBegin < 0) {
this.metricsTimeBegin = DEFAULT_METRICS_BEGIN_TIME;
} else {
this.metricsTimeBegin = metricTimeBegin;
}
if (metricTimeEnd == null || metricTimeEnd < 0) {
this.metricsTimeEnd = DEFAULT_METRICS_END_TIME;
} else {
this.metricsTimeEnd = metricTimeEnd;
}
if (this.metricsTimeBegin > this.metricsTimeEnd) {
throw new IllegalArgumentException("metricstimebegin should not be " +
"greater than metricstimeend");
}
} }
public TimelineFilterList getConfsToRetrieve() { public TimelineFilterList getConfsToRetrieve() {
@ -137,6 +160,14 @@ public class TimelineDataToRetrieve {
return metricsLimit; return metricsLimit;
} }
public Long getMetricsTimeBegin() {
return this.metricsTimeBegin;
}
public Long getMetricsTimeEnd() {
return metricsTimeEnd;
}
public void setMetricsLimit(Integer limit) { public void setMetricsLimit(Integer limit) {
if (limit == null || limit < 1) { if (limit == null || limit < 1) {
this.metricsLimit = DEFAULT_METRICS_LIMIT; this.metricsLimit = DEFAULT_METRICS_LIMIT;

View File

@ -265,6 +265,11 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the entities
* would not contain metric values before this timestamp(Optional query
* param).
* @param metricsTimeEnd If specified, returned metrics for the entities would
* not contain metric values after this timestamp(Optional query param).
* @param fromId If specified, retrieve the next set of entities from the * @param fromId If specified, retrieve the next set of entities from the
* given fromId. The set of entities retrieved is inclusive of specified * given fromId. The set of entities retrieved is inclusive of specified
* fromId. fromId should be taken from the value associated with FROM_ID * fromId. fromId should be taken from the value associated with FROM_ID
@ -300,6 +305,8 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
String url = req.getRequestURI() + String url = req.getRequestURI() +
(req.getQueryString() == null ? "" : (req.getQueryString() == null ? "" :
@ -326,7 +333,8 @@ public class TimelineReaderWebServices {
infofilters, conffilters, metricfilters, eventfilters, infofilters, conffilters, metricfilters, eventfilters,
fromId), fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, handleException(e, url, startTime,
"createdTime start/end or limit or flowrunid"); "createdTime start/end or limit or flowrunid");
@ -407,6 +415,11 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the entities
* would not contain metric values before this timestamp(Optional query
* param).
* @param metricsTimeEnd If specified, returned metrics for the entities would
* not contain metric values after this timestamp(Optional query param).
* @param fromId If specified, retrieve the next set of entities from the * @param fromId If specified, retrieve the next set of entities from the
* given fromId. The set of entities retrieved is inclusive of specified * given fromId. The set of entities retrieved is inclusive of specified
* fromId. fromId should be taken from the value associated with FROM_ID * fromId. fromId should be taken from the value associated with FROM_ID
@ -447,12 +460,14 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
return getEntities(req, res, null, appId, entityType, userId, flowName, return getEntities(req, res, null, appId, entityType, userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromId); metricsTimeStart, metricsTimeEnd, fromId);
} }
/** /**
@ -523,6 +538,11 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the entities
* would not contain metric values before this timestamp(Optional query
* param).
* @param metricsTimeEnd If specified, returned metrics for the entities would
* not contain metric values after this timestamp(Optional query param).
* @param fromId If specified, retrieve the next set of entities from the * @param fromId If specified, retrieve the next set of entities from the
* given fromId. The set of entities retrieved is inclusive of specified * given fromId. The set of entities retrieved is inclusive of specified
* fromId. fromId should be taken from the value associated with FROM_ID * fromId. fromId should be taken from the value associated with FROM_ID
@ -564,6 +584,8 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
String url = req.getRequestURI() + String url = req.getRequestURI() +
(req.getQueryString() == null ? "" : (req.getQueryString() == null ? "" :
@ -586,7 +608,8 @@ public class TimelineReaderWebServices {
infofilters, conffilters, metricfilters, eventfilters, infofilters, conffilters, metricfilters, eventfilters,
fromId), fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, handleException(e, url, startTime,
"createdTime start/end or limit or flowrunid"); "createdTime start/end or limit or flowrunid");
@ -628,6 +651,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the entity would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the entity would
* not contain metric values after this timestamp(Optional query param).
* *
* @return If successful, a HTTP 200(OK) response having a JSON representing a * @return If successful, a HTTP 200(OK) response having a JSON representing a
* <cite>TimelineEntity</cite> instance is returned.<br> * <cite>TimelineEntity</cite> instance is returned.<br>
@ -649,7 +676,9 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) { @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd) {
String url = req.getRequestURI() + String url = req.getRequestURI() +
(req.getQueryString() == null ? "" : (req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString()); QUERY_STRING_SEP + req.getQueryString());
@ -669,7 +698,8 @@ public class TimelineReaderWebServices {
} }
entity = timelineReaderManager.getEntity(context, entity = timelineReaderManager.getEntity(context,
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, "flowrunid"); handleException(e, url, startTime, "flowrunid");
} }
@ -723,6 +753,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the entity would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the entity would
* not contain metric values after this timestamp(Optional query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched. * @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster. * If specified, then entity retrieval will be faster.
* *
@ -752,10 +786,12 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("entityidprefix") String entityIdPrefix) { @QueryParam("entityidprefix") String entityIdPrefix) {
return getEntity(req, res, null, appId, entityType, entityId, userId, return getEntity(req, res, null, appId, entityType, entityId, userId,
flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields, flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields,
metricsLimit, entityIdPrefix); metricsLimit, metricsTimeStart, metricsTimeEnd, entityIdPrefix);
} }
/** /**
@ -797,6 +833,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the entity would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the entity would
* not contain metric values after this timestamp(Optional query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched. * @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster. * If specified, then entity retrieval will be faster.
* *
@ -827,6 +867,8 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("entityidprefix") String entityIdPrefix) { @QueryParam("entityidprefix") String entityIdPrefix) {
String url = req.getRequestURI() + String url = req.getRequestURI() +
(req.getQueryString() == null ? "" : (req.getQueryString() == null ? "" :
@ -845,7 +887,8 @@ public class TimelineReaderWebServices {
clusterId, userId, flowName, flowRunId, appId, entityType, clusterId, userId, flowName, flowRunId, appId, entityType,
entityIdPrefix, entityId), entityIdPrefix, entityId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, "flowrunid"); handleException(e, url, startTime, "flowrunid");
} }
@ -912,7 +955,7 @@ public class TimelineReaderWebServices {
context.setEntityType(TimelineEntityType.YARN_FLOW_RUN.toString()); context.setEntityType(TimelineEntityType.YARN_FLOW_RUN.toString());
entity = timelineReaderManager.getEntity(context, entity = timelineReaderManager.getEntity(context,
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, null, null)); null, metricsToRetrieve, null, null, null, null));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, "flowrunid"); handleException(e, url, startTime, "flowrunid");
} }
@ -1021,7 +1064,7 @@ public class TimelineReaderWebServices {
clusterId, userId, flowName, flowRunId, null, clusterId, userId, flowName, flowRunId, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null, null), TimelineEntityType.YARN_FLOW_RUN.toString(), null, null),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, null, null)); null, metricsToRetrieve, null, null, null, null));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, "flowrunid"); handleException(e, url, startTime, "flowrunid");
} }
@ -1115,7 +1158,7 @@ public class TimelineReaderWebServices {
limit, createdTimeStart, createdTimeEnd, null, null, null, limit, createdTimeStart, createdTimeEnd, null, null, null,
null, null, null, fromId), null, null, null, fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, fields, null)); null, metricsToRetrieve, fields, null, null, null));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, handleException(e, url, startTime,
"createdTime start/end or limit or fromId"); "createdTime start/end or limit or fromId");
@ -1265,7 +1308,7 @@ public class TimelineReaderWebServices {
limit, createdTimeStart, createdTimeEnd, null, null, null, limit, createdTimeStart, createdTimeEnd, null, null, null,
null, null, null, fromId), null, null, null, fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, fields, null)); null, metricsToRetrieve, fields, null, null, null));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, handleException(e, url, startTime,
"createdTime start/end or limit or fromId"); "createdTime start/end or limit or fromId");
@ -1400,7 +1443,7 @@ public class TimelineReaderWebServices {
clusterId, null, null, null, null, clusterId, null, null, null, null,
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null), TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null),
entityFilters, TimelineReaderWebServicesUtils. entityFilters, TimelineReaderWebServicesUtils.
createTimelineDataToRetrieve(null, null, null, null)); createTimelineDataToRetrieve(null, null, null, null, null, null));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, "limit"); handleException(e, url, startTime, "limit");
} }
@ -1441,6 +1484,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the apps would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the apps would
* not contain metric values after this timestamp(Optional query param).
* *
* @return If successful, a HTTP 200(OK) response having a JSON representing a * @return If successful, a HTTP 200(OK) response having a JSON representing a
* <cite>TimelineEntity</cite> instance is returned.<br> * <cite>TimelineEntity</cite> instance is returned.<br>
@ -1462,7 +1509,9 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) { @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd) {
String url = req.getRequestURI() + String url = req.getRequestURI() +
(req.getQueryString() == null ? "" : (req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString()); QUERY_STRING_SEP + req.getQueryString());
@ -1483,7 +1532,8 @@ public class TimelineReaderWebServices {
context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString()); context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
entity = timelineReaderManager.getEntity(context, entity = timelineReaderManager.getEntity(context,
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, "flowrunid"); handleException(e, url, startTime, "flowrunid");
} }
@ -1532,6 +1582,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the app would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the app would
* not contain metric values after this timestamp(Optional query param).
* *
* @return If successful, a HTTP 200(OK) response having a JSON representing a * @return If successful, a HTTP 200(OK) response having a JSON representing a
* <cite>TimelineEntity</cite> instance is returned.<br> * <cite>TimelineEntity</cite> instance is returned.<br>
@ -1556,9 +1610,12 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) { @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd) {
return getApp(req, res, null, appId, flowName, flowRunId, userId, return getApp(req, res, null, appId, flowName, flowRunId, userId,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd);
} }
/** /**
@ -1596,6 +1653,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the app would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the app would
* not contain metric values after this timestamp(Optional query param).
* *
* @return If successful, a HTTP 200(OK) response having a JSON representing a * @return If successful, a HTTP 200(OK) response having a JSON representing a
* <cite>TimelineEntity</cite> instance is returned.<br> * <cite>TimelineEntity</cite> instance is returned.<br>
@ -1621,7 +1682,9 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) { @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd) {
String url = req.getRequestURI() + String url = req.getRequestURI() +
(req.getQueryString() == null ? "" : (req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString()); QUERY_STRING_SEP + req.getQueryString());
@ -1639,7 +1702,8 @@ public class TimelineReaderWebServices {
clusterId, userId, flowName, flowRunId, appId, clusterId, userId, flowName, flowRunId, appId,
TimelineEntityType.YARN_APPLICATION.toString(), null, null), TimelineEntityType.YARN_APPLICATION.toString(), null, null),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, "flowrunid"); handleException(e, url, startTime, "flowrunid");
} }
@ -1712,6 +1776,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the apps would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the apps would
* not contain metric values after this timestamp(Optional query param).
* @param fromId If specified, retrieve the next set of applications * @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive * from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated * of specified fromId. fromId should be taken from the value associated
@ -1746,6 +1814,8 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
String url = req.getRequestURI() + String url = req.getRequestURI() +
(req.getQueryString() == null ? "" : (req.getQueryString() == null ? "" :
@ -1771,7 +1841,8 @@ public class TimelineReaderWebServices {
infofilters, conffilters, metricfilters, eventfilters, infofilters, conffilters, metricfilters, eventfilters,
fromId), fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, handleException(e, url, startTime,
"createdTime start/end or limit or flowrunid"); "createdTime start/end or limit or flowrunid");
@ -1845,6 +1916,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the apps would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the apps would
* not contain metric values after this timestamp(Optional query param).
* @param fromId If specified, retrieve the next set of applications * @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive * from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated * of specified fromId. fromId should be taken from the value associated
@ -1881,12 +1956,15 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
return getEntities(req, res, null, null, return getEntities(req, res, null, null,
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromId); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd, fromId);
} }
/** /**
@ -1950,6 +2028,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the apps would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the apps would
* not contain metric values after this timestamp(Optional query param).
* @param fromId If specified, retrieve the next set of applications * @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive * from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated * of specified fromId. fromId should be taken from the value associated
@ -1988,12 +2070,15 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
return getEntities(req, res, clusterId, null, return getEntities(req, res, clusterId, null,
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromId); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd, fromId);
} }
/** /**
@ -2054,6 +2139,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the apps would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the apps would
* not contain metric values after this timestamp(Optional query param).
* @param fromId If specified, retrieve the next set of applications * @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive * from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated * of specified fromId. fromId should be taken from the value associated
@ -2089,12 +2178,15 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
return getEntities(req, res, null, null, return getEntities(req, res, null, null,
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromId); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd, fromId);
} }
/** /**
@ -2156,6 +2248,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then * or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of * metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param). * metric(s) will be returned. (Optional query param).
* @param metricsTimeStart If specified, returned metrics for the apps would
* not contain metric values before this timestamp(Optional query param).
* @param metricsTimeEnd If specified, returned metrics for the apps would
* not contain metric values after this timestamp(Optional query param).
* @param fromId If specified, retrieve the next set of applications * @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive * from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated * of specified fromId. fromId should be taken from the value associated
@ -2192,12 +2288,15 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
return getEntities(req, res, clusterId, null, return getEntities(req, res, clusterId, null,
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromId); confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
metricsTimeStart, metricsTimeEnd, fromId);
} }
/** /**
@ -2268,6 +2367,12 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1 * have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional * i.e. latest single value of metric(s) will be returned. (Optional
* query param). * query param).
* @param metricsTimeStart If specified, returned metrics for the app attempts
* would not contain metric values before this timestamp(Optional
* query param).
* @param metricsTimeEnd If specified, returned metrics for the app attempts
* would not contain metric values after this timestamp(Optional
* query param).
* @param fromId If specified, retrieve the next set of application-attempt * @param fromId If specified, retrieve the next set of application-attempt
* entities from the given fromId. The set of application-attempt * entities from the given fromId. The set of application-attempt
* entities retrieved is inclusive of specified fromId. fromId should * entities retrieved is inclusive of specified fromId. fromId should
@ -2306,12 +2411,15 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
return getAppAttempts(req, res, null, appId, userId, flowName, flowRunId, return getAppAttempts(req, res, null, appId, userId, flowName, flowRunId,
limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters, confsToRetrieve, infofilters, conffilters, metricfilters, eventfilters, confsToRetrieve,
metricsToRetrieve, fields, metricsLimit, fromId); metricsToRetrieve, fields, metricsLimit, metricsTimeStart,
metricsTimeEnd, fromId);
} }
/** /**
@ -2383,6 +2491,12 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1 * have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional * i.e. latest single value of metric(s) will be returned. (Optional
* query param). * query param).
* @param metricsTimeStart If specified, returned metrics for the app attempts
* would not contain metric values before this timestamp(Optional
* query param).
* @param metricsTimeEnd If specified, returned metrics for the app attempts
* would not contain metric values after this timestamp(Optional
* query param).
* @param fromId If specified, retrieve the next set of application-attempt * @param fromId If specified, retrieve the next set of application-attempt
* entities from the given fromId. The set of application-attempt * entities from the given fromId. The set of application-attempt
* entities retrieved is inclusive of specified fromId. fromId should * entities retrieved is inclusive of specified fromId. fromId should
@ -2422,6 +2536,8 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
return getEntities(req, res, clusterId, appId, return getEntities(req, res, clusterId, appId,
@ -2429,7 +2545,7 @@ public class TimelineReaderWebServices {
flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromId); metricsTimeStart, metricsTimeEnd, fromId);
} }
/** /**
@ -2472,6 +2588,12 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1 * have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional * i.e. latest single value of metric(s) will be returned. (Optional
* query param). * query param).
* @param metricsTimeStart If specified, returned metrics for the app attempt
* would not contain metric values before this timestamp(Optional
* query param).
* @param metricsTimeEnd If specified, returned metrics for the app attempt
* would not contain metric values after this timestamp(Optional
* query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched. * @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster. * If specified, then entity retrieval will be faster.
* *
@ -2499,10 +2621,12 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("entityidprefix") String entityIdPrefix) { @QueryParam("entityidprefix") String entityIdPrefix) {
return getAppAttempt(req, res, null, appId, appAttemptId, userId, flowName, return getAppAttempt(req, res, null, appId, appAttemptId, userId, flowName,
flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit, flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
entityIdPrefix); metricsTimeStart, metricsTimeEnd, entityIdPrefix);
} }
/** /**
@ -2545,6 +2669,12 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1 * have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional * i.e. latest single value of metric(s) will be returned. (Optional
* query param). * query param).
* @param metricsTimeStart If specified, returned metrics for the app attempt
* would not contain metric values before this timestamp(Optional
* query param).
* @param metricsTimeEnd If specified, returned metrics for the app attempt
* would not contain metric values after this timestamp(Optional
* query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched. * @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster. * If specified, then entity retrieval will be faster.
* *
@ -2574,11 +2704,13 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("entityidprefix") String entityIdPrefix) { @QueryParam("entityidprefix") String entityIdPrefix) {
return getEntity(req, res, clusterId, appId, return getEntity(req, res, clusterId, appId,
TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), appAttemptId, TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), appAttemptId,
userId, flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields, userId, flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields,
metricsLimit, entityIdPrefix); metricsLimit, metricsTimeStart, metricsTimeEnd, entityIdPrefix);
} }
/** /**
@ -2651,6 +2783,12 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1 * have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional * i.e. latest single value of metric(s) will be returned. (Optional
* query param). * query param).
* @param metricsTimeStart If specified, returned metrics for the containers
* would not contain metric values before this timestamp(Optional
* query param).
* @param metricsTimeEnd If specified, returned metrics for the containers
* would not contain metric values after this timestamp(Optional
* query param).
* @param fromId If specified, retrieve the next set of container * @param fromId If specified, retrieve the next set of container
* entities from the given fromId. The set of container * entities from the given fromId. The set of container
* entities retrieved is inclusive of specified fromId. fromId should * entities retrieved is inclusive of specified fromId. fromId should
@ -2690,12 +2828,14 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
return getContainers(req, res, null, appId, appattemptId, userId, flowName, return getContainers(req, res, null, appId, appattemptId, userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromId); metricsTimeStart, metricsTimeEnd, fromId);
} }
/** /**
@ -2769,6 +2909,12 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1 * have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional * i.e. latest single value of metric(s) will be returned. (Optional
* query param). * query param).
* @param metricsTimeStart If specified, returned metrics for the containers
* would not contain metric values before this timestamp(Optional
* query param).
* @param metricsTimeEnd If specified, returned metrics for the containers
* would not contain metric values after this timestamp(Optional
* query param).
* @param fromId If specified, retrieve the next set of container * @param fromId If specified, retrieve the next set of container
* entities from the given fromId. The set of container * entities from the given fromId. The set of container
* entities retrieved is inclusive of specified fromId. fromId should * entities retrieved is inclusive of specified fromId. fromId should
@ -2810,6 +2956,8 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("fromid") String fromId) { @QueryParam("fromid") String fromId) {
String entityType = TimelineEntityType.YARN_CONTAINER.toString(); String entityType = TimelineEntityType.YARN_CONTAINER.toString();
@ -2829,7 +2977,7 @@ public class TimelineReaderWebServices {
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilter, conffilters, metricfilters, eventfilters, isRelatedTo, infofilter, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromId); metricsTimeStart, metricsTimeEnd, fromId);
} }
/** /**
@ -2871,6 +3019,12 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1 * have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional * i.e. latest single value of metric(s) will be returned. (Optional
* query param). * query param).
* @param metricsTimeStart If specified, returned metrics for the container
* would not contain metric values before this timestamp(Optional
* query param).
* @param metricsTimeEnd If specified, returned metrics for the container
* would not contain metric values after this timestamp(Optional
* query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched. * @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster. * If specified, then entity retrieval will be faster.
* *
@ -2898,10 +3052,12 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("entityidprefix") String entityIdPrefix) { @QueryParam("entityidprefix") String entityIdPrefix) {
return getContainer(req, res, null, appId, containerId, userId, flowName, return getContainer(req, res, null, appId, containerId, userId, flowName,
flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit, flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
entityIdPrefix); entityIdPrefix, metricsTimeStart, metricsTimeEnd);
} }
/** /**
@ -2944,6 +3100,12 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1 * have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional * i.e. latest single value of metric(s) will be returned. (Optional
* query param). * query param).
* @param metricsTimeStart If specified, returned metrics for the container
* would not contain metric values before this timestamp(Optional
* query param).
* @param metricsTimeEnd If specified, returned metrics for the container
* would not contain metric values after this timestamp(Optional
* query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched. * @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster. * If specified, then entity retrieval will be faster.
* *
@ -2973,11 +3135,13 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields, @QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit, @QueryParam("metricslimit") String metricsLimit,
@QueryParam("metricstimestart") String metricsTimeStart,
@QueryParam("metricstimeend") String metricsTimeEnd,
@QueryParam("entityidprefix") String entityIdPrefix) { @QueryParam("entityidprefix") String entityIdPrefix) {
return getEntity(req, res, clusterId, appId, return getEntity(req, res, clusterId, appId,
TimelineEntityType.YARN_CONTAINER.toString(), containerId, userId, TimelineEntityType.YARN_CONTAINER.toString(), containerId, userId,
flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields, flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields,
metricsLimit, entityIdPrefix); metricsLimit, metricsTimeStart, metricsTimeEnd, entityIdPrefix);
} }
/** /**

View File

@ -127,11 +127,13 @@ final class TimelineReaderWebServicesUtils {
* @throws TimelineParseException if any problem occurs during parsing. * @throws TimelineParseException if any problem occurs during parsing.
*/ */
static TimelineDataToRetrieve createTimelineDataToRetrieve(String confs, static TimelineDataToRetrieve createTimelineDataToRetrieve(String confs,
String metrics, String fields, String metricsLimit) String metrics, String fields, String metricsLimit,
String metricsTimeBegin, String metricsTimeEnd)
throws TimelineParseException { throws TimelineParseException {
return new TimelineDataToRetrieve(parseDataToRetrieve(confs), return new TimelineDataToRetrieve(parseDataToRetrieve(confs),
parseDataToRetrieve(metrics), parseFieldsStr(fields, parseDataToRetrieve(metrics), parseFieldsStr(fields,
TimelineParseConstants.COMMA_DELIMITER), parseIntStr(metricsLimit)); TimelineParseConstants.COMMA_DELIMITER), parseIntStr(metricsLimit),
parseLongStr(metricsTimeBegin), parseLongStr(metricsTimeEnd));
} }
/** /**

View File

@ -319,7 +319,7 @@ public class TestFileSystemTimelineReaderImpl {
TimelineEntity result = reader.getEntity( TimelineEntity result = reader.getEntity(
new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
"app", "id_1"), "app", "id_1"),
new TimelineDataToRetrieve(null, null, null, null)); new TimelineDataToRetrieve(null, null, null, null, null, null));
Assert.assertEquals( Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_1")).toString(), (new TimelineEntity.Identifier("app", "id_1")).toString(),
result.getIdentifier().toString()); result.getIdentifier().toString());
@ -334,7 +334,7 @@ public class TestFileSystemTimelineReaderImpl {
TimelineEntity result = reader.getEntity( TimelineEntity result = reader.getEntity(
new TimelineReaderContext("cluster1", null, null, null, "app1", "app", new TimelineReaderContext("cluster1", null, null, null, "app1", "app",
"id_1"), "id_1"),
new TimelineDataToRetrieve(null, null, null, null)); new TimelineDataToRetrieve(null, null, null, null, null, null));
Assert.assertEquals( Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_1")).toString(), (new TimelineEntity.Identifier("app", "id_1")).toString(),
result.getIdentifier().toString()); result.getIdentifier().toString());
@ -351,7 +351,7 @@ public class TestFileSystemTimelineReaderImpl {
TimelineEntity result = reader.getEntity( TimelineEntity result = reader.getEntity(
new TimelineReaderContext("cluster1", null, null, null, "app2", new TimelineReaderContext("cluster1", null, null, null, "app2",
"app", "id_5"), "app", "id_5"),
new TimelineDataToRetrieve(null, null, null, null)); new TimelineDataToRetrieve(null, null, null, null, null, null));
Assert.assertEquals( Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_5")).toString(), (new TimelineEntity.Identifier("app", "id_5")).toString(),
result.getIdentifier().toString()); result.getIdentifier().toString());
@ -365,7 +365,8 @@ public class TestFileSystemTimelineReaderImpl {
new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
"app", "id_1"), "app", "id_1"),
new TimelineDataToRetrieve(null, null, new TimelineDataToRetrieve(null, null,
EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS), null)); EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS), null, null,
null));
Assert.assertEquals( Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_1")).toString(), (new TimelineEntity.Identifier("app", "id_1")).toString(),
result.getIdentifier().toString()); result.getIdentifier().toString());
@ -383,7 +384,8 @@ public class TestFileSystemTimelineReaderImpl {
TimelineEntity result = reader.getEntity( TimelineEntity result = reader.getEntity(
new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
"app", "id_1"), "app", "id_1"),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
Assert.assertEquals( Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_1")).toString(), (new TimelineEntity.Identifier("app", "id_1")).toString(),
result.getIdentifier().toString()); result.getIdentifier().toString());
@ -399,7 +401,8 @@ public class TestFileSystemTimelineReaderImpl {
Set<TimelineEntity> result = reader.getEntities( Set<TimelineEntity> result = reader.getEntities(
new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
"app", null), new TimelineEntityFilters.Builder().build(), "app", null), new TimelineEntityFilters.Builder().build(),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null,
null, null));
// All 4 entities will be returned // All 4 entities will be returned
Assert.assertEquals(4, result.size()); Assert.assertEquals(4, result.size());
} }