YARN-6256. Add FROM_ID info key for timeline entities in reader response (Rohith Sharma K S via Varun Saxena)

This commit is contained in:
Varun Saxena 2017-03-07 23:54:38 +05:30
parent f9a18eac12
commit 0b076bc509
14 changed files with 445 additions and 261 deletions

View File

@ -825,7 +825,7 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception {
assertEquals(2, entities1.size());
for (TimelineEntity entity : entities1) {
assertNotNull(entity.getInfo());
assertEquals(1, entity.getInfo().size());
assertEquals(2, entity.getInfo().size());
String uid =
(String) entity.getInfo().get(TimelineReaderManager.UID_KEY);
assertNotNull(uid);
@ -853,7 +853,7 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception {
assertEquals(2, entities2.size());
for (TimelineEntity entity : entities2) {
assertNotNull(entity.getInfo());
assertEquals(1, entity.getInfo().size());
assertEquals(2, entity.getInfo().size());
String uid =
(String) entity.getInfo().get(TimelineReaderManager.UID_KEY);
assertNotNull(uid);
@ -1417,8 +1417,9 @@ public void testGetEntitiesInfoFilters() throws Exception {
infoCnt += entity.getInfo().size();
assertEquals("entity2", entity.getId());
}
// Includes UID in info field even if fields not specified as INFO.
assertEquals(1, infoCnt);
// Includes UID and FROM_ID in info field even if fields not specified as
// INFO.
assertEquals(2, infoCnt);
// infofilters=(info1 eq cluster1 AND info4 eq 35000) OR
// (info1 eq cluster2 AND info2 eq 2.0)
@ -1436,8 +1437,8 @@ public void testGetEntitiesInfoFilters() throws Exception {
infoCnt += entity.getInfo().size();
assertEquals("entity2", entity.getId());
}
// Includes UID in info field.
assertEquals(4, infoCnt);
// Includes UID and FROM_ID in info field.
assertEquals(5, infoCnt);
// Test for behavior when compare op is ne(not equals) vs ene
// (exists and not equals). info3 does not exist for entity2. For ne,
@ -2171,8 +2172,8 @@ public void testGenericEntitiesForPagination() throws Exception {
// verify for entity-10 to entity-7 in descending order.
TimelineEntity entity = verifyPaginatedEntites(entities, limit, 10);
queryParam = "?limit=" + limit + "&&fromidprefix=" + entity.getIdPrefix()
+ "&&fromid=" + entity.getId();
queryParam = "?limit=" + limit + "&fromid="
+ entity.getInfo().get(TimelineReaderUtils.FROMID_KEY);
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
@ -2180,7 +2181,8 @@ public void testGenericEntitiesForPagination() throws Exception {
// verify for entity-7 to entity-4 in descending order.
entity = verifyPaginatedEntites(entities, limit, 7);
queryParam = "?limit=" + limit + "&&fromidprefix=" + entity.getIdPrefix();
queryParam = "?limit=" + limit + "&fromid="
+ entity.getInfo().get(TimelineReaderUtils.FROMID_KEY);
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
@ -2188,7 +2190,8 @@ public void testGenericEntitiesForPagination() throws Exception {
// verify for entity-4 to entity-1 in descending order.
entity = verifyPaginatedEntites(entities, limit, 4);
queryParam = "?limit=" + limit + "&&fromidprefix=" + entity.getIdPrefix();
queryParam = "?limit=" + limit + "&fromid="
+ entity.getInfo().get(TimelineReaderUtils.FROMID_KEY);
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
@ -2264,7 +2267,8 @@ public void testForFlowAppsPagination() throws Exception {
TimelineEntity entity10 = entities.get(limit - 1);
uri =
URI.create(resourceUri + queryParam + "&fromid=" + entity10.getId());
URI.create(resourceUri + queryParam + "&fromid="
+ entity10.getInfo().get(TimelineReaderUtils.FROMID_KEY));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
});
@ -2308,7 +2312,8 @@ public void testForFlowRunAppsPagination() throws Exception {
TimelineEntity entity3 = entities.get(limit - 1);
uri =
URI.create(resourceUri + queryParam + "&fromid=" + entity3.getId());
URI.create(resourceUri + queryParam + "&fromid="
+ entity3.getInfo().get(TimelineReaderUtils.FROMID_KEY));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
});
@ -2352,7 +2357,7 @@ public void testForFlowRunsPagination() throws Exception {
TimelineEntity entity2 = entities.get(limit - 1);
uri = URI.create(resourceUri + queryParam + "&fromid="
+ entity2.getInfo().get("SYSTEM_INFO_FLOW_RUN_ID"));
+ entity2.getInfo().get(TimelineReaderUtils.FROMID_KEY));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
});
@ -2362,7 +2367,7 @@ public void testForFlowRunsPagination() throws Exception {
assertEquals(entity3, entities.get(1));
uri = URI.create(resourceUri + queryParam + "&fromid="
+ entity3.getInfo().get("SYSTEM_INFO_FLOW_RUN_ID"));
+ entity3.getInfo().get(TimelineReaderUtils.FROMID_KEY));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
});

View File

@ -389,6 +389,8 @@ public void testWriteApplicationToHBase() throws Exception {
e1.getType());
assertEquals(cTime, e1.getCreatedTime());
Map<String, Object> infoMap2 = e1.getInfo();
// fromid key is added by storage. Remove it for comparision.
infoMap2.remove("FROM_ID");
assertEquals(infoMap, infoMap2);
Map<String, Set<String>> isRelatedTo2 = e1.getIsRelatedToEntities();
@ -448,6 +450,9 @@ public void testWriteApplicationToHBase() throws Exception {
assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
e1.getType());
assertEquals(cTime, e1.getCreatedTime());
infoMap2 = e1.getInfo();
// fromid key is added by storage. Remove it for comparison.
infoMap2.remove("FROM_ID");
assertEquals(infoMap, e1.getInfo());
assertEquals(isRelatedTo, e1.getIsRelatedToEntities());
assertEquals(relatesTo, e1.getRelatesToEntities());
@ -680,7 +685,7 @@ public void testReadApps() throws Exception {
}
assertEquals(5, cfgCnt);
assertEquals(3, metricCnt);
assertEquals(5, infoCnt);
assertEquals(8, infoCnt);
assertEquals(4, eventCnt);
assertEquals(4, relatesToCnt);
assertEquals(4, isRelatedToCnt);
@ -744,7 +749,8 @@ public void testReadAppsDefaultView() throws Exception {
TimelineEntityType.YARN_APPLICATION.toString(), null),
new TimelineDataToRetrieve());
assertNotNull(e1);
assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() &&
assertEquals(1, e1.getInfo().size());
assertTrue(e1.getConfigs().isEmpty() &&
e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() &&
e1.getRelatesToEntities().isEmpty());
Set<TimelineEntity> es1 = reader.getEntities(
@ -755,7 +761,8 @@ public void testReadAppsDefaultView() throws Exception {
new TimelineDataToRetrieve());
assertEquals(3, es1.size());
for (TimelineEntity e : es1) {
assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() &&
assertEquals(1, e1.getInfo().size());
assertTrue(e.getConfigs().isEmpty() &&
e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() &&
e.getRelatesToEntities().isEmpty());
}
@ -788,7 +795,7 @@ public void testReadAppsByFields() throws Exception {
isRelatedToCnt += entity.getIsRelatedToEntities().size();
infoCnt += entity.getInfo().size();
}
assertEquals(0, infoCnt);
assertEquals(3, infoCnt);
assertEquals(4, isRelatedToCnt);
assertEquals(3, metricsCnt);
}
@ -1770,7 +1777,7 @@ public void testReadAppsInfoFilters() throws Exception {
for (TimelineEntity entity : entities) {
infoCnt += entity.getInfo().size();
}
assertEquals(5, infoCnt);
assertEquals(7, infoCnt);
TimelineFilterList infoFilterList1 = new TimelineFilterList(
new TimelineKeyValueFilter(
@ -1787,7 +1794,7 @@ public void testReadAppsInfoFilters() throws Exception {
for (TimelineEntity entity : entities) {
infoCnt += entity.getInfo().size();
}
assertEquals(3, infoCnt);
assertEquals(4, infoCnt);
TimelineFilterList infoFilterList2 = new TimelineFilterList(
new TimelineKeyValueFilter(

View File

@ -311,6 +311,8 @@ public void testWriteEntityToHBase() throws Exception {
assertEquals(type, e1.getType());
assertEquals(cTime, e1.getCreatedTime());
Map<String, Object> infoMap2 = e1.getInfo();
// fromid key is added by storage. Remove it for comparison.
infoMap2.remove("FROM_ID");
assertEquals(infoMap, infoMap2);
Map<String, Set<String>> isRelatedTo2 = e1.getIsRelatedToEntities();
@ -336,7 +338,10 @@ public void testWriteEntityToHBase() throws Exception {
assertEquals(id, e1.getId());
assertEquals(type, e1.getType());
assertEquals(cTime, e1.getCreatedTime());
assertEquals(infoMap, e1.getInfo());
infoMap2 = e1.getInfo();
// fromid key is added by storage. Remove it for comparision.
infoMap2.remove("FROM_ID");
assertEquals(infoMap, infoMap2);
assertEquals(isRelatedTo, e1.getIsRelatedToEntities());
assertEquals(relatesTo, e1.getRelatesToEntities());
assertEquals(conf, e1.getConfigs());
@ -573,7 +578,7 @@ public void testReadEntities() throws Exception {
}
assertEquals(5, cfgCnt);
assertEquals(3, metricCnt);
assertEquals(5, infoCnt);
assertEquals(8, infoCnt);
assertEquals(4, eventCnt);
assertEquals(4, relatesToCnt);
assertEquals(4, isRelatedToCnt);
@ -1126,7 +1131,8 @@ public void testReadEntitiesDefaultView() throws Exception {
1002345678919L, "application_1231111111_1111", "world", "hello"),
new TimelineDataToRetrieve());
assertNotNull(e1);
assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() &&
assertEquals(1, e1.getInfo().size());
assertTrue(e1.getConfigs().isEmpty() &&
e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() &&
e1.getRelatesToEntities().isEmpty());
Set<TimelineEntity> es1 = reader.getEntities(
@ -1136,9 +1142,10 @@ public void testReadEntitiesDefaultView() throws Exception {
new TimelineDataToRetrieve());
assertEquals(3, es1.size());
for (TimelineEntity e : es1) {
assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() &&
assertTrue(e.getConfigs().isEmpty() &&
e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() &&
e.getRelatesToEntities().isEmpty());
assertEquals(1, e.getInfo().size());
}
}
@ -1167,7 +1174,7 @@ public void testReadEntitiesByFields() throws Exception {
isRelatedToCnt += entity.getIsRelatedToEntities().size();
infoCnt += entity.getInfo().size();
}
assertEquals(0, infoCnt);
assertEquals(3, infoCnt);
assertEquals(4, isRelatedToCnt);
assertEquals(3, metricsCnt);
}
@ -1603,7 +1610,7 @@ public void testReadEntitiesInfoFilters() throws Exception {
for (TimelineEntity entity : entities) {
infoCnt += entity.getInfo().size();
}
assertEquals(5, infoCnt);
assertEquals(7, infoCnt);
TimelineFilterList infoFilterList1 = new TimelineFilterList(
new TimelineKeyValueFilter(
@ -1619,7 +1626,7 @@ public void testReadEntitiesInfoFilters() throws Exception {
for (TimelineEntity entity : entities) {
infoCnt += entity.getInfo().size();
}
assertEquals(3, infoCnt);
assertEquals(4, infoCnt);
TimelineFilterList infoFilterList2 = new TimelineFilterList(
new TimelineKeyValueFilter(

View File

@ -18,9 +18,13 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.application;
import java.util.List;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
@ -33,7 +37,7 @@ public class ApplicationRowKey {
private final String flowName;
private final Long flowRunId;
private final String appId;
private final KeyConverter<ApplicationRowKey> appRowKeyConverter =
private final ApplicationRowKeyConverter appRowKeyConverter =
new ApplicationRowKeyConverter();
public ApplicationRowKey(String clusterId, String userId, String flowName,
@ -85,6 +89,24 @@ public static ApplicationRowKey parseRowKey(byte[] rowKey) {
return new ApplicationRowKeyConverter().decode(rowKey);
}
/**
* Constructs a row key for the application table as follows:
* {@code clusterId!userName!flowName!flowRunId!AppId}.
* @return String representation of row key.
*/
public String getRowKeyAsString() {
return appRowKeyConverter.encodeAsString(this);
}
/**
* Given the encoded row key as string, returns the row key as an object.
* @param encodedRowKey String representation of row key.
* @return A <cite>ApplicationRowKey</cite> object.
*/
public static ApplicationRowKey parseRowKeyFromString(String encodedRowKey) {
return new ApplicationRowKeyConverter().decodeFromString(encodedRowKey);
}
/**
* Encodes and decodes row key for application table. The row key is of the
* form: clusterId!userName!flowName!flowRunId!appId. flowRunId is a long,
@ -93,7 +115,7 @@ public static ApplicationRowKey parseRowKey(byte[] rowKey) {
* <p>
*/
final private static class ApplicationRowKeyConverter implements
KeyConverter<ApplicationRowKey> {
KeyConverter<ApplicationRowKey>, KeyConverterToString<ApplicationRowKey> {
private final KeyConverter<String> appIDKeyConverter =
new AppIdKeyConverter();
@ -201,6 +223,29 @@ public ApplicationRowKey decode(byte[] rowKey) {
return new ApplicationRowKey(clusterId, userId, flowName, flowRunId,
appId);
}
@Override
public String encodeAsString(ApplicationRowKey key) {
if (key.clusterId == null || key.userId == null || key.flowName == null
|| key.flowRunId == null || key.appId == null) {
throw new IllegalArgumentException();
}
return TimelineReaderUtils
.joinAndEscapeStrings(new String[] {key.clusterId, key.userId,
key.flowName, key.flowRunId.toString(), key.appId});
}
@Override
public ApplicationRowKey decodeFromString(String encodedRowKey) {
List<String> split = TimelineReaderUtils.split(encodedRowKey);
if (split == null || split.size() != 5) {
throw new IllegalArgumentException(
"Invalid row key for application table.");
}
Long flowRunId = Long.valueOf(split.get(3));
return new ApplicationRowKey(split.get(0), split.get(1), split.get(2),
flowRunId, split.get(4));
}
}
}

View File

@ -17,9 +17,13 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
import java.util.List;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
@ -35,7 +39,7 @@ public class EntityRowKey {
private final String entityType;
private final Long entityIdPrefix;
private final String entityId;
private final KeyConverter<EntityRowKey> entityRowKeyConverter =
private final EntityRowKeyConverter entityRowKeyConverter =
new EntityRowKeyConverter();
public EntityRowKey(String clusterId, String userId, String flowName,
@ -96,7 +100,6 @@ public byte[] getRowKey() {
/**
* Given the raw row key as bytes, returns the row key as an object.
*
* @param rowKey byte representation of row key.
* @return An <cite>EntityRowKey</cite> object.
*/
@ -104,6 +107,27 @@ public static EntityRowKey parseRowKey(byte[] rowKey) {
return new EntityRowKeyConverter().decode(rowKey);
}
/**
* Constructs a row key for the entity table as follows:
* <p>
* {@code userName!clusterId!flowName!flowRunId!AppId!
* entityType!entityIdPrefix!entityId}.
* </p>
* @return String representation of row key.
*/
public String getRowKeyAsString() {
return entityRowKeyConverter.encodeAsString(this);
}
/**
* Given the encoded row key as string, returns the row key as an object.
* @param encodedRowKey String representation of row key.
* @return A <cite>EntityRowKey</cite> object.
*/
public static EntityRowKey parseRowKeyFromString(String encodedRowKey) {
return new EntityRowKeyConverter().decodeFromString(encodedRowKey);
}
/**
* Encodes and decodes row key for entity table. The row key is of the form :
* userName!clusterId!flowName!flowRunId!appId!entityType!entityId. flowRunId
@ -112,7 +136,7 @@ public static EntityRowKey parseRowKey(byte[] rowKey) {
* <p>
*/
final private static class EntityRowKeyConverter implements
KeyConverter<EntityRowKey> {
KeyConverter<EntityRowKey>, KeyConverterToString<EntityRowKey> {
private final AppIdKeyConverter appIDKeyConverter = new AppIdKeyConverter();
@ -245,5 +269,31 @@ public EntityRowKey decode(byte[] rowKey) {
return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
entityType, entityPrefixId, entityId);
}
@Override
public String encodeAsString(EntityRowKey key) {
if (key.clusterId == null || key.userId == null || key.flowName == null
|| key.flowRunId == null || key.appId == null
|| key.entityType == null || key.entityIdPrefix == null
|| key.entityId == null) {
throw new IllegalArgumentException();
}
return TimelineReaderUtils
.joinAndEscapeStrings(new String[] {key.clusterId, key.userId,
key.flowName, key.flowRunId.toString(), key.appId, key.entityType,
key.entityIdPrefix.toString(), key.entityId});
}
@Override
public EntityRowKey decodeFromString(String encodedRowKey) {
List<String> split = TimelineReaderUtils.split(encodedRowKey);
if (split == null || split.size() != 8) {
throw new IllegalArgumentException("Invalid row key for entity table.");
}
Long flowRunId = Long.valueOf(split.get(3));
Long entityIdPrefix = Long.valueOf(split.get(6));
return new EntityRowKey(split.get(0), split.get(1), split.get(2),
flowRunId, split.get(4), split.get(5), entityIdPrefix, split.get(7));
}
}
}

View File

@ -17,8 +17,12 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import java.util.List;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
@ -70,7 +74,6 @@ public byte[] getRowKey() {
/**
* Given the raw row key as bytes, returns the row key as an object.
*
* @param rowKey Byte representation of row key.
* @return A <cite>FlowRunRowKey</cite> object.
*/
@ -78,6 +81,24 @@ public static FlowRunRowKey parseRowKey(byte[] rowKey) {
return new FlowRunRowKeyConverter().decode(rowKey);
}
/**
* Constructs a row key for the flow run table as follows:
* {@code clusterId!userId!flowName!Flow Run Id}.
* @return String representation of row key
*/
public String getRowKeyAsString() {
return flowRunRowKeyConverter.encodeAsString(this);
}
/**
* Given the encoded row key as string, returns the row key as an object.
* @param encodedRowKey String representation of row key.
* @return A <cite>FlowRunRowKey</cite> object.
*/
public static FlowRunRowKey parseRowKeyFromString(String encodedRowKey) {
return new FlowRunRowKeyConverter().decodeFromString(encodedRowKey);
}
/**
* returns the Flow Key as a verbose String output.
* @return String
@ -101,7 +122,7 @@ public String toString() {
* <p>
*/
final private static class FlowRunRowKeyConverter implements
KeyConverter<FlowRunRowKey> {
KeyConverter<FlowRunRowKey>, KeyConverterToString<FlowRunRowKey> {
private FlowRunRowKeyConverter() {
}
@ -186,5 +207,27 @@ public FlowRunRowKey decode(byte[] rowKey) {
LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3]));
return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
}
@Override
public String encodeAsString(FlowRunRowKey key) {
if (key.clusterId == null || key.userId == null || key.flowName == null
|| key.flowRunId == null) {
throw new IllegalArgumentException();
}
return TimelineReaderUtils.joinAndEscapeStrings(new String[] {
key.clusterId, key.userId, key.flowName, key.flowRunId.toString()});
}
@Override
public FlowRunRowKey decodeFromString(String encodedRowKey) {
List<String> split = TimelineReaderUtils.split(encodedRowKey);
if (split == null || split.size() != 4) {
throw new IllegalArgumentException(
"Invalid row key for flow run table.");
}
Long flowRunId = Long.valueOf(split.get(3));
return new FlowRunRowKey(split.get(0), split.get(1), split.get(2),
flowRunId);
}
}
}

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
@ -48,11 +49,11 @@
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import com.google.common.base.Preconditions;
@ -372,18 +373,17 @@ protected ResultScanner getResults(Configuration hbaseConf,
context.getFlowRunId());
scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix());
} else {
Long flowRunId = context.getFlowRunId();
if (flowRunId == null) {
AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(
getFilters().getFromId());
FlowContext flowContext = lookupFlowContext(appToFlowRowKey,
context.getClusterId(), hbaseConf, conn);
flowRunId = flowContext.getFlowRunId();
ApplicationRowKey applicationRowKey = null;
try {
applicationRowKey =
ApplicationRowKey.parseRowKeyFromString(getFilters().getFromId());
} catch (IllegalArgumentException e) {
throw new BadRequestException("Invalid filter fromid is provided.");
}
if (!context.getClusterId().equals(applicationRowKey.getClusterId())) {
throw new BadRequestException(
"fromid doesn't belong to clusterId=" + context.getClusterId());
}
ApplicationRowKey applicationRowKey =
new ApplicationRowKey(context.getClusterId(), context.getUserId(),
context.getFlowName(), flowRunId, getFilters().getFromId());
// set start row
scan.setStartRow(applicationRowKey.getRowKey());
@ -497,6 +497,10 @@ protected TimelineEntity parseEntity(Result result) throws IOException {
if (hasField(fieldsToRetrieve, Field.METRICS)) {
readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
}
ApplicationRowKey rowKey = ApplicationRowKey.parseRowKey(result.getRow());
entity.getInfo().put(TimelineReaderUtils.FROMID_KEY,
rowKey.getRowKeyAsString());
return entity;
}
}

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
@ -217,11 +218,17 @@ protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
context.getUserId(), context.getFlowName());
scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix());
} else {
FlowRunRowKey flowRunRowKey =
new FlowRunRowKey(context.getClusterId(), context.getUserId(),
context.getFlowName(), Long.parseLong(getFilters().getFromId()));
FlowRunRowKey flowRunRowKey = null;
try {
flowRunRowKey =
FlowRunRowKey.parseRowKeyFromString(getFilters().getFromId());
} catch (IllegalArgumentException e) {
throw new BadRequestException("Invalid filter fromid is provided.");
}
if (!context.getClusterId().equals(flowRunRowKey.getClusterId())) {
throw new BadRequestException(
"fromid doesn't belong to clusterId=" + context.getClusterId());
}
// set start row
scan.setStartRow(flowRunRowKey.getRowKey());
@ -247,16 +254,11 @@ protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
@Override
protected TimelineEntity parseEntity(Result result) throws IOException {
TimelineReaderContext context = getContext();
FlowRunEntity flowRun = new FlowRunEntity();
flowRun.setUser(context.getUserId());
flowRun.setName(context.getFlowName());
if (isSingleEntityRead()) {
flowRun.setRunId(context.getFlowRunId());
} else {
FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow());
flowRun.setRunId(rowKey.getFlowRunId());
}
FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow());
flowRun.setRunId(rowKey.getFlowRunId());
flowRun.setUser(rowKey.getUserId());
flowRun.setName(rowKey.getFlowName());
// read the start time
Long startTime = (Long) FlowRunColumn.MIN_START_TIME.readResult(result);
@ -285,6 +287,8 @@ protected TimelineEntity parseEntity(Result result) throws IOException {
// set the id
flowRun.setId(flowRun.getId());
flowRun.getInfo().put(TimelineReaderUtils.FROMID_KEY,
rowKey.getRowKeyAsString());
return flowRun;
}
}

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
@ -56,6 +57,7 @@
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import com.google.common.base.Preconditions;
@ -475,19 +477,27 @@ protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
TimelineReaderContext context = getContext();
RowKeyPrefix<EntityRowKey> entityRowKeyPrefix = null;
// default mode, will always scans from beginning of entity type.
if (getFilters() == null || getFilters().getFromIdPrefix() == null) {
if (getFilters() == null || getFilters().getFromId() == null) {
entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(),
context.getUserId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId(), context.getEntityType(), null, null);
scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix());
} else { // pagination mode, will scan from given entityIdPrefix!enitityId
entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(),
context.getUserId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId(), context.getEntityType(),
getFilters().getFromIdPrefix(), getFilters().getFromId());
EntityRowKey entityRowKey = null;
try {
entityRowKey =
EntityRowKey.parseRowKeyFromString(getFilters().getFromId());
} catch (IllegalArgumentException e) {
throw new BadRequestException("Invalid filter fromid is provided.");
}
if (!context.getClusterId().equals(entityRowKey.getClusterId())) {
throw new BadRequestException(
"fromid doesn't belong to clusterId=" + context.getClusterId());
}
// set start row
scan.setStartRow(entityRowKeyPrefix.getRowKeyPrefix());
scan.setStartRow(entityRowKey.getRowKey());
// get the bytes for stop row
entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(),
@ -599,6 +609,9 @@ protected TimelineEntity parseEntity(Result result) throws IOException {
if (hasField(fieldsToRetrieve, Field.METRICS)) {
readMetrics(entity, result, EntityColumnPrefix.METRIC);
}
entity.getInfo().put(TimelineReaderUtils.FROMID_KEY,
parseRowKey.getRowKeyAsString());
return entity;
}

View File

@ -23,7 +23,6 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
@ -225,26 +224,6 @@ public void testFlowActivityRowKey() {
verifyRowPrefixBytes(byteRowKeyPrefix);
}
@Test
public void testFlowActivityRowKeyAsString() {
String cluster = "cl" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR + "uster"
+ TimelineReaderUtils.DEFAULT_ESCAPE_CHAR;
String user = TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "user";
String fName = "dummy_" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR
+ TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "flow"
+ TimelineReaderUtils.DEFAULT_DELIMITER_CHAR;
Long ts = 1459900830000L;
Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
String rowKeyAsString =
new FlowActivityRowKey(cluster, ts, user, fName).getRowKeyAsString();
FlowActivityRowKey rowKey =
FlowActivityRowKey.parseRowKeyFromString(rowKeyAsString);
assertEquals(cluster, rowKey.getClusterId());
assertEquals(dayTimestamp, rowKey.getDayTimestamp());
assertEquals(user, rowKey.getUserId());
assertEquals(fName, rowKey.getFlowName());
}
@Test
public void testFlowRunRowKey() {
byte[] byteRowKey =

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
import org.junit.Test;
/**
* Test for row key as string.
*/
public class TestRowKeysAsString {
private final static String CLUSTER =
"cl" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR + "uster"
+ TimelineReaderUtils.DEFAULT_ESCAPE_CHAR;
private final static String USER =
TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "user";
private final static String FLOW_NAME =
"dummy_" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR
+ TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "flow"
+ TimelineReaderUtils.DEFAULT_DELIMITER_CHAR;
private final static Long FLOW_RUN_ID = System.currentTimeMillis();
private final static String APPLICATION_ID =
ApplicationId.newInstance(System.currentTimeMillis(), 1).toString();
@Test(timeout = 10000)
public void testApplicationRow() {
String rowKeyAsString = new ApplicationRowKey(CLUSTER, USER, FLOW_NAME,
FLOW_RUN_ID, APPLICATION_ID).getRowKeyAsString();
ApplicationRowKey rowKey =
ApplicationRowKey.parseRowKeyFromString(rowKeyAsString);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
assertEquals(APPLICATION_ID, rowKey.getAppId());
}
@Test(timeout = 10000)
public void testEntityRowKey() {
char del = TimelineReaderUtils.DEFAULT_DELIMITER_CHAR;
char esc = TimelineReaderUtils.DEFAULT_ESCAPE_CHAR;
String id = del + esc + "ent" + esc + del + "ity" + esc + del + esc + "id"
+ esc + del + esc;
String type = "entity" + esc + del + esc + "Type";
TimelineEntity entity = new TimelineEntity();
entity.setId(id);
entity.setType(type);
entity.setIdPrefix(54321);
String rowKeyAsString =
new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
entity.getType(), entity.getIdPrefix(), entity.getId())
.getRowKeyAsString();
EntityRowKey rowKey = EntityRowKey.parseRowKeyFromString(rowKeyAsString);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
assertEquals(APPLICATION_ID, rowKey.getAppId());
assertEquals(entity.getType(), rowKey.getEntityType());
assertEquals(entity.getIdPrefix(), rowKey.getEntityIdPrefix().longValue());
assertEquals(entity.getId(), rowKey.getEntityId());
}
@Test(timeout = 10000)
public void testFlowActivityRowKey() {
Long ts = 1459900830000L;
Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
String rowKeyAsString = new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME)
.getRowKeyAsString();
FlowActivityRowKey rowKey =
FlowActivityRowKey.parseRowKeyFromString(rowKeyAsString);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(dayTimestamp, rowKey.getDayTimestamp());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
}
@Test(timeout = 10000)
public void testFlowRunRowKey() {
String rowKeyAsString =
new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID)
.getRowKeyAsString();
FlowRunRowKey rowKey = FlowRunRowKey.parseRowKeyFromString(rowKeyAsString);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
}
}

View File

@ -99,19 +99,10 @@
* filter list, event filters can be evaluated with logical AND/OR and we can
* create a hierarchy of these {@link TimelineExistsFilter} objects. If null or
* empty, the filter is not applied.</li>
* <li><b>fromIdPrefix</b> - If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If fromIdPrefix is same
* for all entities of a given entity type, then the user must provide fromId as
* a filter to denote the start entity from which further entities will be
* fetched. fromIdPrefix is mandatory even in the case the entity id prefix is
* not used and should be set to 0.</li>
* <li><b>fromId</b> - If specified along with fromIdPrefix, retrieve entities
* with an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal to entity
* id specified in fromId. Please note than fromIdPrefix is mandatory if fromId
* is specified, otherwise, the filter will be ignored. It is recommended to
* provide both fromIdPrefix and fromId filters for more accurate results as id
* prefix may not be unique for an entity.</li>
* <li><b>fromId</b> - If specified, retrieve the next set of entities from the
* given fromId. The set of entities retrieved is inclusive of specified fromId.
* fromId should be taken from the value associated with FROM_ID info key in
* entity response which was sent earlier.</li>
* </ul>
*/
@Private
@ -126,7 +117,6 @@ public class TimelineEntityFilters {
private TimelineFilterList configFilters;
private TimelineFilterList metricFilters;
private TimelineFilterList eventFilters;
private Long fromIdPrefix;
private String fromId;
private static final long DEFAULT_BEGIN_TIME = 0L;
private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
@ -146,11 +136,10 @@ public TimelineEntityFilters(Long entityLimit, Long timeBegin, Long timeEnd,
TimelineFilterList entityInfoFilters,
TimelineFilterList entityConfigFilters,
TimelineFilterList entityMetricFilters,
TimelineFilterList entityEventFilters, Long fromidprefix, String fromid) {
TimelineFilterList entityEventFilters, String fromid) {
this(entityLimit, timeBegin, timeEnd, entityRelatesTo, entityIsRelatedTo,
entityInfoFilters, entityConfigFilters, entityMetricFilters,
entityEventFilters);
this.fromIdPrefix = fromidprefix;
this.fromId = fromid;
}
@ -276,12 +265,4 @@ public String getFromId() {
public void setFromId(String fromId) {
this.fromId = fromId;
}
public Long getFromIdPrefix() {
return fromIdPrefix;
}
public void setFromIdPrefix(Long fromIdPrefix) {
this.fromIdPrefix = fromIdPrefix;
}
}

View File

@ -265,20 +265,10 @@ public TimelineAbout about(
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If fromIdPrefix
* is same for all entities of a given entity type, then the user must
* provide fromId as a filter to denote the start entity from which
* further entities will be fetched. fromIdPrefix is mandatory even
* in the case the entity id prefix is not used and should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
* @param fromId If specified, retrieve the next set of entities from the
* given fromId. The set of entities retrieved is inclusive of specified
* fromId. fromId should be taken from the value associated with FROM_ID
* info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances of the given entity type
@ -310,7 +300,6 @@ public Set<TimelineEntity> getEntities(
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
String url = req.getRequestURI() +
(req.getQueryString() == null ? "" :
@ -335,7 +324,7 @@ public Set<TimelineEntity> getEntities(
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters,
fromIdPrefix, fromId),
fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
} catch (Exception e) {
@ -418,20 +407,10 @@ public Set<TimelineEntity> getEntities(
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If fromIdPrefix
* is same for all entities of a given entity type, then the user must
* provide fromId as a filter to denote the start entity from which
* further entities will be fetched. fromIdPrefix is mandatory even
* in the case the entity id prefix is not used and should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
* @param fromId If specified, retrieve the next set of entities from the
* given fromId. The set of entities retrieved is inclusive of specified
* fromId. fromId should be taken from the value associated with FROM_ID
* info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances of the given entity type
@ -468,12 +447,11 @@ public Set<TimelineEntity> getEntities(
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
return getEntities(req, res, null, appId, entityType, userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromIdPrefix,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromId);
}
@ -545,20 +523,10 @@ public Set<TimelineEntity> getEntities(
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If fromIdPrefix
* is same for all entities of a given entity type, then the user must
* provide fromId as a filter to denote the start entity from which
* further entities will be fetched. fromIdPrefix is mandatory even
* in the case the entity id prefix is not used and should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
* @param fromId If specified, retrieve the next set of entities from the
* given fromId. The set of entities retrieved is inclusive of specified
* fromId. fromId should be taken from the value associated with FROM_ID
* info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances of the given entity type
@ -596,7 +564,6 @@ public Set<TimelineEntity> getEntities(
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
String url = req.getRequestURI() +
(req.getQueryString() == null ? "" :
@ -617,7 +584,7 @@ public Set<TimelineEntity> getEntities(
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters,
fromIdPrefix, fromId),
fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
} catch (Exception e) {
@ -1098,9 +1065,10 @@ public TimelineEntity getFlowRun(
* METRICS makes sense for flow runs hence only ALL or METRICS are
* supported as fields for fetching flow runs. Other fields will lead to
* HTTP 400 (Bad Request) response. (Optional query param).
* @param fromId Defines the flow run id. If specified, retrieve the next
* set of flow runs from the given id. The set of flow runs retrieved
* is inclusive of specified fromId.
* @param fromId If specified, retrieve the next set of flow run entities
* from the given fromId. The set of entities retrieved is inclusive of
* specified fromId. fromId should be taken from the value associated
* with FROM_ID info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>FlowRunEntity</cite> instances for the given flow are
@ -1145,7 +1113,7 @@ public Set<TimelineEntity> getFlowRuns(
entities = timelineReaderManager.getEntities(context,
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, null, null, null,
null, null, null, null, fromId),
null, null, null, fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, fields, null));
} catch (Exception e) {
@ -1188,9 +1156,10 @@ public Set<TimelineEntity> getFlowRuns(
* METRICS makes sense for flow runs hence only ALL or METRICS are
* supported as fields for fetching flow runs. Other fields will lead to
* HTTP 400 (Bad Request) response. (Optional query param).
* @param fromId Defines the flow run id. If specified, retrieve the next
* set of flow runs from the given id. The set of flow runs retrieved
* is inclusive of specified fromId.
* @param fromId If specified, retrieve the next set of flow run entities
* from the given fromId. The set of entities retrieved is inclusive of
* specified fromId. fromId should be taken from the value associated
* with FROM_ID info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>FlowRunEntity</cite> instances for the given flow are
@ -1247,9 +1216,10 @@ public Set<TimelineEntity> getFlowRuns(
* METRICS makes sense for flow runs hence only ALL or METRICS are
* supported as fields for fetching flow runs. Other fields will lead to
* HTTP 400 (Bad Request) response. (Optional query param).
* @param fromId Defines the flow run id. If specified, retrieve the next
* set of flow runs from the given id. The set of flow runs retrieved
* is inclusive of specified fromId.
* @param fromId If specified, retrieve the next set of flow run entities
* from the given fromId. The set of entities retrieved is inclusive of
* specified fromId. fromId should be taken from the value associated
* with FROM_ID info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>FlowRunEntity</cite> instances for the given flow are
@ -1293,7 +1263,7 @@ public Set<TimelineEntity> getFlowRuns(
TimelineEntityType.YARN_FLOW_RUN.toString(), null, null),
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, null, null, null,
null, null, null, null, fromId),
null, null, null, fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, fields, null));
} catch (Exception e) {
@ -1423,7 +1393,7 @@ public Set<TimelineEntity> getFlows(
DateRange range = parseDateRange(dateRange);
TimelineEntityFilters entityFilters =
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, null, null, null, null, null, null, null, null, null,
limit, null, null, null, null, null, null, null, null,
fromId);
entityFilters.setCreatedTimeBegin(range.dateStart);
entityFilters.setCreatedTimeEnd(range.dateEnd);
@ -1744,9 +1714,10 @@ public TimelineEntity getApp(
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromId Defines the application id. If specified, retrieve the next
* set of applications from the given id. The set of applications
* retrieved is inclusive of specified fromId.
* @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated
* with FROM_ID info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances representing apps is
@ -1799,7 +1770,7 @@ public Set<TimelineEntity> getFlowRunApps(
entities = timelineReaderManager.getEntities(context,
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters, null,
infofilters, conffilters, metricfilters, eventfilters,
fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
@ -1876,9 +1847,10 @@ public Set<TimelineEntity> getFlowRunApps(
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromId Defines the application id. If specified, retrieve the next
* set of applications from the given id. The set of applications
* retrieved is inclusive of specified fromId.
* @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated
* with FROM_ID info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances representing apps is
@ -1916,7 +1888,7 @@ public Set<TimelineEntity> getFlowRunApps(
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId);
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromId);
}
/**
@ -1980,9 +1952,10 @@ public Set<TimelineEntity> getFlowRunApps(
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromId Defines the application id. If specified, retrieve the next
* set of applications from the given id. The set of applications
* retrieved is inclusive of specified fromId.
* @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated
* with FROM_ID info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances representing apps is
@ -2022,7 +1995,7 @@ public Set<TimelineEntity> getFlowRunApps(
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId);
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromId);
}
/**
@ -2083,9 +2056,10 @@ public Set<TimelineEntity> getFlowRunApps(
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromId Defines the application id. If specified, retrieve the next
* set of applications from the given id. The set of applications
* retrieved is inclusive of specified fromId.
* @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated
* with FROM_ID info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances representing apps is
@ -2122,7 +2096,7 @@ public Set<TimelineEntity> getFlowApps(
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId);
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromId);
}
/**
@ -2184,9 +2158,10 @@ public Set<TimelineEntity> getFlowApps(
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromId Defines the application id. If specified, retrieve the next
* set of applications from the given id. The set of applications
* retrieved is inclusive of specified fromId.
* @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated
* with FROM_ID info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances representing apps is
@ -2224,7 +2199,7 @@ public Set<TimelineEntity> getFlowApps(
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId);
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromId);
}
/**
@ -2295,21 +2270,11 @@ public Set<TimelineEntity> getFlowApps(
* have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional
* query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If
* fromIdPrefix is same for all entities of a given entity type, then
* the user must provide fromId as a filter to denote the start
* entity from which further entities will be fetched. fromIdPrefix
* is mandatory even in the case the entity id prefix is not used and
* should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
* @param fromId If specified, retrieve the next set of application-attempt
* entities from the given fromId. The set of application-attempt
* entities retrieved is inclusive of specified fromId. fromId should
* be taken from the value associated with FROM_ID info key in
* entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>TimelineEntity</cite> instances of the app-attempt
@ -2343,13 +2308,12 @@ public Set<TimelineEntity> getAppAttempts(@Context HttpServletRequest req,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
return getAppAttempts(req, res, null, appId, userId, flowName, flowRunId,
limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters, confsToRetrieve,
metricsToRetrieve, fields, metricsLimit, fromIdPrefix, fromId);
metricsToRetrieve, fields, metricsLimit, fromId);
}
/**
@ -2421,21 +2385,11 @@ public Set<TimelineEntity> getAppAttempts(@Context HttpServletRequest req,
* have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional
* query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If
* fromIdPrefix is same for all entities of a given entity type, then
* the user must provide fromId as a filter to denote the start
* entity from which further entities will be fetched. fromIdPrefix
* is mandatory even in the case the entity id prefix is not used and
* should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
* @param fromId If specified, retrieve the next set of application-attempt
* entities from the given fromId. The set of application-attempt
* entities retrieved is inclusive of specified fromId. fromId should
* be taken from the value associated with FROM_ID info key in
* entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>TimelineEntity</cite> instances of the app-attempts
@ -2470,7 +2424,6 @@ public Set<TimelineEntity> getAppAttempts(@Context HttpServletRequest req,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
return getEntities(req, res, clusterId, appId,
@ -2478,7 +2431,7 @@ public Set<TimelineEntity> getAppAttempts(@Context HttpServletRequest req,
flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromIdPrefix, fromId);
fromId);
}
/**
@ -2700,21 +2653,11 @@ public TimelineEntity getAppAttempt(@Context HttpServletRequest req,
* have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional
* query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If
* fromIdPrefix is same for all entities of a given entity type, then
* the user must provide fromId as a filter to denote the start
* entity from which further entities will be fetched. fromIdPrefix
* is mandatory even in the case the entity id prefix is not used and
* should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
* @param fromId If specified, retrieve the next set of container
* entities from the given fromId. The set of container
* entities retrieved is inclusive of specified fromId. fromId should
* be taken from the value associated with FROM_ID info key in
* entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>TimelineEntity</cite> instances of the containers
@ -2749,12 +2692,11 @@ public Set<TimelineEntity> getContainers(@Context HttpServletRequest req,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
return getContainers(req, res, null, appId, appattemptId, userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromIdPrefix,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromId);
}
@ -2829,21 +2771,11 @@ public Set<TimelineEntity> getContainers(@Context HttpServletRequest req,
* have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional
* query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If
* fromIdPrefix is same for all entities of a given entity type, then
* the user must provide fromId as a filter to denote the start
* entity from which further entities will be fetched. fromIdPrefix
* is mandatory even in the case the entity id prefix is not used and
* should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
* @param fromId If specified, retrieve the next set of container
* entities from the given fromId. The set of container
* entities retrieved is inclusive of specified fromId. fromId should
* be taken from the value associated with FROM_ID info key in
* entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>TimelineEntity</cite> instances of the containers
@ -2880,7 +2812,6 @@ public Set<TimelineEntity> getContainers(@Context HttpServletRequest req,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
String entityType = TimelineEntityType.YARN_CONTAINER.toString();
@ -2899,7 +2830,7 @@ public Set<TimelineEntity> getContainers(@Context HttpServletRequest req,
return getEntities(req, res, clusterId, appId, entityType, userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilter, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromIdPrefix,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromId);
}

View File

@ -74,14 +74,14 @@ static TimelineReaderContext createTimelineReaderContext(String clusterId,
static TimelineEntityFilters createTimelineEntityFilters(String limit,
String createdTimeStart, String createdTimeEnd, String relatesTo,
String isRelatedTo, String infofilters, String conffilters,
String metricfilters, String eventfilters, String fromidprefix,
String metricfilters, String eventfilters,
String fromid) throws TimelineParseException {
return new TimelineEntityFilters(parseLongStr(limit),
parseLongStr(createdTimeStart), parseLongStr(createdTimeEnd),
parseRelationFilters(relatesTo), parseRelationFilters(isRelatedTo),
parseKVFilters(infofilters, false), parseKVFilters(conffilters, true),
parseMetricFilters(metricfilters), parseEventFilters(eventfilters),
parseLongStr(fromidprefix), parseStr(fromid));
parseStr(fromid));
}
/**