YARN-6256. Add FROM_ID info key for timeline entities in reader response (Rohith Sharma K S via Varun Saxena)

This commit is contained in:
Varun Saxena 2017-03-07 23:54:38 +05:30
parent 8bb2646595
commit c3bd8d6ad3
14 changed files with 445 additions and 261 deletions

View File

@ -825,7 +825,7 @@ public class TestTimelineReaderWebServicesHBaseStorage
assertEquals(2, entities1.size());
for (TimelineEntity entity : entities1) {
assertNotNull(entity.getInfo());
assertEquals(1, entity.getInfo().size());
assertEquals(2, entity.getInfo().size());
String uid =
(String) entity.getInfo().get(TimelineReaderManager.UID_KEY);
assertNotNull(uid);
@ -853,7 +853,7 @@ public class TestTimelineReaderWebServicesHBaseStorage
assertEquals(2, entities2.size());
for (TimelineEntity entity : entities2) {
assertNotNull(entity.getInfo());
assertEquals(1, entity.getInfo().size());
assertEquals(2, entity.getInfo().size());
String uid =
(String) entity.getInfo().get(TimelineReaderManager.UID_KEY);
assertNotNull(uid);
@ -1417,8 +1417,9 @@ public class TestTimelineReaderWebServicesHBaseStorage
infoCnt += entity.getInfo().size();
assertEquals("entity2", entity.getId());
}
// Includes UID in info field even if fields not specified as INFO.
assertEquals(1, infoCnt);
// Includes UID and FROM_ID in info field even if fields not specified as
// INFO.
assertEquals(2, infoCnt);
// infofilters=(info1 eq cluster1 AND info4 eq 35000) OR
// (info1 eq cluster2 AND info2 eq 2.0)
@ -1436,8 +1437,8 @@ public class TestTimelineReaderWebServicesHBaseStorage
infoCnt += entity.getInfo().size();
assertEquals("entity2", entity.getId());
}
// Includes UID in info field.
assertEquals(4, infoCnt);
// Includes UID and FROM_ID in info field.
assertEquals(5, infoCnt);
// Test for behavior when compare op is ne(not equals) vs ene
// (exists and not equals). info3 does not exist for entity2. For ne,
@ -2171,8 +2172,8 @@ public class TestTimelineReaderWebServicesHBaseStorage
// verify for entity-10 to entity-7 in descending order.
TimelineEntity entity = verifyPaginatedEntites(entities, limit, 10);
queryParam = "?limit=" + limit + "&&fromidprefix=" + entity.getIdPrefix()
+ "&&fromid=" + entity.getId();
queryParam = "?limit=" + limit + "&fromid="
+ entity.getInfo().get(TimelineReaderUtils.FROMID_KEY);
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
@ -2180,7 +2181,8 @@ public class TestTimelineReaderWebServicesHBaseStorage
// verify for entity-7 to entity-4 in descending order.
entity = verifyPaginatedEntites(entities, limit, 7);
queryParam = "?limit=" + limit + "&&fromidprefix=" + entity.getIdPrefix();
queryParam = "?limit=" + limit + "&fromid="
+ entity.getInfo().get(TimelineReaderUtils.FROMID_KEY);
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
@ -2188,7 +2190,8 @@ public class TestTimelineReaderWebServicesHBaseStorage
// verify for entity-4 to entity-1 in descending order.
entity = verifyPaginatedEntites(entities, limit, 4);
queryParam = "?limit=" + limit + "&&fromidprefix=" + entity.getIdPrefix();
queryParam = "?limit=" + limit + "&fromid="
+ entity.getInfo().get(TimelineReaderUtils.FROMID_KEY);
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
@ -2264,7 +2267,8 @@ public class TestTimelineReaderWebServicesHBaseStorage
TimelineEntity entity10 = entities.get(limit - 1);
uri =
URI.create(resourceUri + queryParam + "&fromid=" + entity10.getId());
URI.create(resourceUri + queryParam + "&fromid="
+ entity10.getInfo().get(TimelineReaderUtils.FROMID_KEY));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
});
@ -2308,7 +2312,8 @@ public class TestTimelineReaderWebServicesHBaseStorage
TimelineEntity entity3 = entities.get(limit - 1);
uri =
URI.create(resourceUri + queryParam + "&fromid=" + entity3.getId());
URI.create(resourceUri + queryParam + "&fromid="
+ entity3.getInfo().get(TimelineReaderUtils.FROMID_KEY));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
});
@ -2352,7 +2357,7 @@ public class TestTimelineReaderWebServicesHBaseStorage
TimelineEntity entity2 = entities.get(limit - 1);
uri = URI.create(resourceUri + queryParam + "&fromid="
+ entity2.getInfo().get("SYSTEM_INFO_FLOW_RUN_ID"));
+ entity2.getInfo().get(TimelineReaderUtils.FROMID_KEY));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
});
@ -2362,7 +2367,7 @@ public class TestTimelineReaderWebServicesHBaseStorage
assertEquals(entity3, entities.get(1));
uri = URI.create(resourceUri + queryParam + "&fromid="
+ entity3.getInfo().get("SYSTEM_INFO_FLOW_RUN_ID"));
+ entity3.getInfo().get(TimelineReaderUtils.FROMID_KEY));
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
});

View File

@ -389,6 +389,8 @@ public class TestHBaseTimelineStorageApps {
e1.getType());
assertEquals(cTime, e1.getCreatedTime());
Map<String, Object> infoMap2 = e1.getInfo();
// fromid key is added by storage. Remove it for comparision.
infoMap2.remove("FROM_ID");
assertEquals(infoMap, infoMap2);
Map<String, Set<String>> isRelatedTo2 = e1.getIsRelatedToEntities();
@ -448,6 +450,9 @@ public class TestHBaseTimelineStorageApps {
assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
e1.getType());
assertEquals(cTime, e1.getCreatedTime());
infoMap2 = e1.getInfo();
// fromid key is added by storage. Remove it for comparison.
infoMap2.remove("FROM_ID");
assertEquals(infoMap, e1.getInfo());
assertEquals(isRelatedTo, e1.getIsRelatedToEntities());
assertEquals(relatesTo, e1.getRelatesToEntities());
@ -680,7 +685,7 @@ public class TestHBaseTimelineStorageApps {
}
assertEquals(5, cfgCnt);
assertEquals(3, metricCnt);
assertEquals(5, infoCnt);
assertEquals(8, infoCnt);
assertEquals(4, eventCnt);
assertEquals(4, relatesToCnt);
assertEquals(4, isRelatedToCnt);
@ -744,7 +749,8 @@ public class TestHBaseTimelineStorageApps {
TimelineEntityType.YARN_APPLICATION.toString(), null),
new TimelineDataToRetrieve());
assertNotNull(e1);
assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() &&
assertEquals(1, e1.getInfo().size());
assertTrue(e1.getConfigs().isEmpty() &&
e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() &&
e1.getRelatesToEntities().isEmpty());
Set<TimelineEntity> es1 = reader.getEntities(
@ -755,7 +761,8 @@ public class TestHBaseTimelineStorageApps {
new TimelineDataToRetrieve());
assertEquals(3, es1.size());
for (TimelineEntity e : es1) {
assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() &&
assertEquals(1, e1.getInfo().size());
assertTrue(e.getConfigs().isEmpty() &&
e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() &&
e.getRelatesToEntities().isEmpty());
}
@ -788,7 +795,7 @@ public class TestHBaseTimelineStorageApps {
isRelatedToCnt += entity.getIsRelatedToEntities().size();
infoCnt += entity.getInfo().size();
}
assertEquals(0, infoCnt);
assertEquals(3, infoCnt);
assertEquals(4, isRelatedToCnt);
assertEquals(3, metricsCnt);
}
@ -1770,7 +1777,7 @@ public class TestHBaseTimelineStorageApps {
for (TimelineEntity entity : entities) {
infoCnt += entity.getInfo().size();
}
assertEquals(5, infoCnt);
assertEquals(7, infoCnt);
TimelineFilterList infoFilterList1 = new TimelineFilterList(
new TimelineKeyValueFilter(
@ -1787,7 +1794,7 @@ public class TestHBaseTimelineStorageApps {
for (TimelineEntity entity : entities) {
infoCnt += entity.getInfo().size();
}
assertEquals(3, infoCnt);
assertEquals(4, infoCnt);
TimelineFilterList infoFilterList2 = new TimelineFilterList(
new TimelineKeyValueFilter(

View File

@ -311,6 +311,8 @@ public class TestHBaseTimelineStorageEntities {
assertEquals(type, e1.getType());
assertEquals(cTime, e1.getCreatedTime());
Map<String, Object> infoMap2 = e1.getInfo();
// fromid key is added by storage. Remove it for comparison.
infoMap2.remove("FROM_ID");
assertEquals(infoMap, infoMap2);
Map<String, Set<String>> isRelatedTo2 = e1.getIsRelatedToEntities();
@ -336,7 +338,10 @@ public class TestHBaseTimelineStorageEntities {
assertEquals(id, e1.getId());
assertEquals(type, e1.getType());
assertEquals(cTime, e1.getCreatedTime());
assertEquals(infoMap, e1.getInfo());
infoMap2 = e1.getInfo();
// fromid key is added by storage. Remove it for comparision.
infoMap2.remove("FROM_ID");
assertEquals(infoMap, infoMap2);
assertEquals(isRelatedTo, e1.getIsRelatedToEntities());
assertEquals(relatesTo, e1.getRelatesToEntities());
assertEquals(conf, e1.getConfigs());
@ -573,7 +578,7 @@ public class TestHBaseTimelineStorageEntities {
}
assertEquals(5, cfgCnt);
assertEquals(3, metricCnt);
assertEquals(5, infoCnt);
assertEquals(8, infoCnt);
assertEquals(4, eventCnt);
assertEquals(4, relatesToCnt);
assertEquals(4, isRelatedToCnt);
@ -1126,7 +1131,8 @@ public class TestHBaseTimelineStorageEntities {
1002345678919L, "application_1231111111_1111", "world", "hello"),
new TimelineDataToRetrieve());
assertNotNull(e1);
assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() &&
assertEquals(1, e1.getInfo().size());
assertTrue(e1.getConfigs().isEmpty() &&
e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() &&
e1.getRelatesToEntities().isEmpty());
Set<TimelineEntity> es1 = reader.getEntities(
@ -1136,9 +1142,10 @@ public class TestHBaseTimelineStorageEntities {
new TimelineDataToRetrieve());
assertEquals(3, es1.size());
for (TimelineEntity e : es1) {
assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() &&
assertTrue(e.getConfigs().isEmpty() &&
e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() &&
e.getRelatesToEntities().isEmpty());
assertEquals(1, e.getInfo().size());
}
}
@ -1167,7 +1174,7 @@ public class TestHBaseTimelineStorageEntities {
isRelatedToCnt += entity.getIsRelatedToEntities().size();
infoCnt += entity.getInfo().size();
}
assertEquals(0, infoCnt);
assertEquals(3, infoCnt);
assertEquals(4, isRelatedToCnt);
assertEquals(3, metricsCnt);
}
@ -1603,7 +1610,7 @@ public class TestHBaseTimelineStorageEntities {
for (TimelineEntity entity : entities) {
infoCnt += entity.getInfo().size();
}
assertEquals(5, infoCnt);
assertEquals(7, infoCnt);
TimelineFilterList infoFilterList1 = new TimelineFilterList(
new TimelineKeyValueFilter(
@ -1619,7 +1626,7 @@ public class TestHBaseTimelineStorageEntities {
for (TimelineEntity entity : entities) {
infoCnt += entity.getInfo().size();
}
assertEquals(3, infoCnt);
assertEquals(4, infoCnt);
TimelineFilterList infoFilterList2 = new TimelineFilterList(
new TimelineKeyValueFilter(

View File

@ -18,9 +18,13 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.application;
import java.util.List;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
@ -33,7 +37,7 @@ public class ApplicationRowKey {
private final String flowName;
private final Long flowRunId;
private final String appId;
private final KeyConverter<ApplicationRowKey> appRowKeyConverter =
private final ApplicationRowKeyConverter appRowKeyConverter =
new ApplicationRowKeyConverter();
public ApplicationRowKey(String clusterId, String userId, String flowName,
@ -85,6 +89,24 @@ public class ApplicationRowKey {
return new ApplicationRowKeyConverter().decode(rowKey);
}
/**
* Constructs a row key for the application table as follows:
* {@code clusterId!userName!flowName!flowRunId!AppId}.
* @return String representation of row key.
*/
public String getRowKeyAsString() {
return appRowKeyConverter.encodeAsString(this);
}
/**
* Given the encoded row key as string, returns the row key as an object.
* @param encodedRowKey String representation of row key.
* @return A <cite>ApplicationRowKey</cite> object.
*/
public static ApplicationRowKey parseRowKeyFromString(String encodedRowKey) {
return new ApplicationRowKeyConverter().decodeFromString(encodedRowKey);
}
/**
* Encodes and decodes row key for application table. The row key is of the
* form: clusterId!userName!flowName!flowRunId!appId. flowRunId is a long,
@ -93,7 +115,7 @@ public class ApplicationRowKey {
* <p>
*/
final private static class ApplicationRowKeyConverter implements
KeyConverter<ApplicationRowKey> {
KeyConverter<ApplicationRowKey>, KeyConverterToString<ApplicationRowKey> {
private final KeyConverter<String> appIDKeyConverter =
new AppIdKeyConverter();
@ -201,6 +223,29 @@ public class ApplicationRowKey {
return new ApplicationRowKey(clusterId, userId, flowName, flowRunId,
appId);
}
@Override
public String encodeAsString(ApplicationRowKey key) {
if (key.clusterId == null || key.userId == null || key.flowName == null
|| key.flowRunId == null || key.appId == null) {
throw new IllegalArgumentException();
}
return TimelineReaderUtils
.joinAndEscapeStrings(new String[] {key.clusterId, key.userId,
key.flowName, key.flowRunId.toString(), key.appId});
}
@Override
public ApplicationRowKey decodeFromString(String encodedRowKey) {
List<String> split = TimelineReaderUtils.split(encodedRowKey);
if (split == null || split.size() != 5) {
throw new IllegalArgumentException(
"Invalid row key for application table.");
}
Long flowRunId = Long.valueOf(split.get(3));
return new ApplicationRowKey(split.get(0), split.get(1), split.get(2),
flowRunId, split.get(4));
}
}
}

View File

@ -17,9 +17,13 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
import java.util.List;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
@ -35,7 +39,7 @@ public class EntityRowKey {
private final String entityType;
private final Long entityIdPrefix;
private final String entityId;
private final KeyConverter<EntityRowKey> entityRowKeyConverter =
private final EntityRowKeyConverter entityRowKeyConverter =
new EntityRowKeyConverter();
public EntityRowKey(String clusterId, String userId, String flowName,
@ -96,7 +100,6 @@ public class EntityRowKey {
/**
* Given the raw row key as bytes, returns the row key as an object.
*
* @param rowKey byte representation of row key.
* @return An <cite>EntityRowKey</cite> object.
*/
@ -104,6 +107,27 @@ public class EntityRowKey {
return new EntityRowKeyConverter().decode(rowKey);
}
/**
* Constructs a row key for the entity table as follows:
* <p>
* {@code userName!clusterId!flowName!flowRunId!AppId!
* entityType!entityIdPrefix!entityId}.
* </p>
* @return String representation of row key.
*/
public String getRowKeyAsString() {
return entityRowKeyConverter.encodeAsString(this);
}
/**
* Given the encoded row key as string, returns the row key as an object.
* @param encodedRowKey String representation of row key.
* @return A <cite>EntityRowKey</cite> object.
*/
public static EntityRowKey parseRowKeyFromString(String encodedRowKey) {
return new EntityRowKeyConverter().decodeFromString(encodedRowKey);
}
/**
* Encodes and decodes row key for entity table. The row key is of the form :
* userName!clusterId!flowName!flowRunId!appId!entityType!entityId. flowRunId
@ -112,7 +136,7 @@ public class EntityRowKey {
* <p>
*/
final private static class EntityRowKeyConverter implements
KeyConverter<EntityRowKey> {
KeyConverter<EntityRowKey>, KeyConverterToString<EntityRowKey> {
private final AppIdKeyConverter appIDKeyConverter = new AppIdKeyConverter();
@ -245,5 +269,31 @@ public class EntityRowKey {
return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
entityType, entityPrefixId, entityId);
}
@Override
public String encodeAsString(EntityRowKey key) {
if (key.clusterId == null || key.userId == null || key.flowName == null
|| key.flowRunId == null || key.appId == null
|| key.entityType == null || key.entityIdPrefix == null
|| key.entityId == null) {
throw new IllegalArgumentException();
}
return TimelineReaderUtils
.joinAndEscapeStrings(new String[] {key.clusterId, key.userId,
key.flowName, key.flowRunId.toString(), key.appId, key.entityType,
key.entityIdPrefix.toString(), key.entityId});
}
@Override
public EntityRowKey decodeFromString(String encodedRowKey) {
List<String> split = TimelineReaderUtils.split(encodedRowKey);
if (split == null || split.size() != 8) {
throw new IllegalArgumentException("Invalid row key for entity table.");
}
Long flowRunId = Long.valueOf(split.get(3));
Long entityIdPrefix = Long.valueOf(split.get(6));
return new EntityRowKey(split.get(0), split.get(1), split.get(2),
flowRunId, split.get(4), split.get(5), entityIdPrefix, split.get(7));
}
}
}

View File

@ -17,8 +17,12 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import java.util.List;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
@ -70,7 +74,6 @@ public class FlowRunRowKey {
/**
* Given the raw row key as bytes, returns the row key as an object.
*
* @param rowKey Byte representation of row key.
* @return A <cite>FlowRunRowKey</cite> object.
*/
@ -78,6 +81,24 @@ public class FlowRunRowKey {
return new FlowRunRowKeyConverter().decode(rowKey);
}
/**
* Constructs a row key for the flow run table as follows:
* {@code clusterId!userId!flowName!Flow Run Id}.
* @return String representation of row key
*/
public String getRowKeyAsString() {
return flowRunRowKeyConverter.encodeAsString(this);
}
/**
* Given the encoded row key as string, returns the row key as an object.
* @param encodedRowKey String representation of row key.
* @return A <cite>FlowRunRowKey</cite> object.
*/
public static FlowRunRowKey parseRowKeyFromString(String encodedRowKey) {
return new FlowRunRowKeyConverter().decodeFromString(encodedRowKey);
}
/**
* returns the Flow Key as a verbose String output.
* @return String
@ -101,7 +122,7 @@ public class FlowRunRowKey {
* <p>
*/
final private static class FlowRunRowKeyConverter implements
KeyConverter<FlowRunRowKey> {
KeyConverter<FlowRunRowKey>, KeyConverterToString<FlowRunRowKey> {
private FlowRunRowKeyConverter() {
}
@ -186,5 +207,27 @@ public class FlowRunRowKey {
LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3]));
return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
}
@Override
public String encodeAsString(FlowRunRowKey key) {
if (key.clusterId == null || key.userId == null || key.flowName == null
|| key.flowRunId == null) {
throw new IllegalArgumentException();
}
return TimelineReaderUtils.joinAndEscapeStrings(new String[] {
key.clusterId, key.userId, key.flowName, key.flowRunId.toString()});
}
@Override
public FlowRunRowKey decodeFromString(String encodedRowKey) {
List<String> split = TimelineReaderUtils.split(encodedRowKey);
if (split == null || split.size() != 4) {
throw new IllegalArgumentException(
"Invalid row key for flow run table.");
}
Long flowRunId = Long.valueOf(split.get(3));
return new FlowRunRowKey(split.get(0), split.get(1), split.get(2),
flowRunId);
}
}
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
@ -48,11 +49,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import com.google.common.base.Preconditions;
@ -372,18 +373,17 @@ class ApplicationEntityReader extends GenericEntityReader {
context.getFlowRunId());
scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix());
} else {
Long flowRunId = context.getFlowRunId();
if (flowRunId == null) {
AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(
getFilters().getFromId());
FlowContext flowContext = lookupFlowContext(appToFlowRowKey,
context.getClusterId(), hbaseConf, conn);
flowRunId = flowContext.getFlowRunId();
ApplicationRowKey applicationRowKey = null;
try {
applicationRowKey =
ApplicationRowKey.parseRowKeyFromString(getFilters().getFromId());
} catch (IllegalArgumentException e) {
throw new BadRequestException("Invalid filter fromid is provided.");
}
if (!context.getClusterId().equals(applicationRowKey.getClusterId())) {
throw new BadRequestException(
"fromid doesn't belong to clusterId=" + context.getClusterId());
}
ApplicationRowKey applicationRowKey =
new ApplicationRowKey(context.getClusterId(), context.getUserId(),
context.getFlowName(), flowRunId, getFilters().getFromId());
// set start row
scan.setStartRow(applicationRowKey.getRowKey());
@ -497,6 +497,10 @@ class ApplicationEntityReader extends GenericEntityReader {
if (hasField(fieldsToRetrieve, Field.METRICS)) {
readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
}
ApplicationRowKey rowKey = ApplicationRowKey.parseRowKey(result.getRow());
entity.getInfo().put(TimelineReaderUtils.FROMID_KEY,
rowKey.getRowKeyAsString());
return entity;
}
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
@ -217,11 +218,17 @@ class FlowRunEntityReader extends TimelineEntityReader {
context.getUserId(), context.getFlowName());
scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix());
} else {
FlowRunRowKey flowRunRowKey =
new FlowRunRowKey(context.getClusterId(), context.getUserId(),
context.getFlowName(), Long.parseLong(getFilters().getFromId()));
FlowRunRowKey flowRunRowKey = null;
try {
flowRunRowKey =
FlowRunRowKey.parseRowKeyFromString(getFilters().getFromId());
} catch (IllegalArgumentException e) {
throw new BadRequestException("Invalid filter fromid is provided.");
}
if (!context.getClusterId().equals(flowRunRowKey.getClusterId())) {
throw new BadRequestException(
"fromid doesn't belong to clusterId=" + context.getClusterId());
}
// set start row
scan.setStartRow(flowRunRowKey.getRowKey());
@ -247,16 +254,11 @@ class FlowRunEntityReader extends TimelineEntityReader {
@Override
protected TimelineEntity parseEntity(Result result) throws IOException {
TimelineReaderContext context = getContext();
FlowRunEntity flowRun = new FlowRunEntity();
flowRun.setUser(context.getUserId());
flowRun.setName(context.getFlowName());
if (isSingleEntityRead()) {
flowRun.setRunId(context.getFlowRunId());
} else {
FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow());
flowRun.setRunId(rowKey.getFlowRunId());
}
FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow());
flowRun.setRunId(rowKey.getFlowRunId());
flowRun.setUser(rowKey.getUserId());
flowRun.setName(rowKey.getFlowName());
// read the start time
Long startTime = (Long) FlowRunColumn.MIN_START_TIME.readResult(result);
@ -285,6 +287,8 @@ class FlowRunEntityReader extends TimelineEntityReader {
// set the id
flowRun.setId(flowRun.getId());
flowRun.getInfo().put(TimelineReaderUtils.FROMID_KEY,
rowKey.getRowKeyAsString());
return flowRun;
}
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import com.google.common.base.Preconditions;
@ -475,19 +477,27 @@ class GenericEntityReader extends TimelineEntityReader {
TimelineReaderContext context = getContext();
RowKeyPrefix<EntityRowKey> entityRowKeyPrefix = null;
// default mode, will always scans from beginning of entity type.
if (getFilters() == null || getFilters().getFromIdPrefix() == null) {
if (getFilters() == null || getFilters().getFromId() == null) {
entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(),
context.getUserId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId(), context.getEntityType(), null, null);
scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix());
} else { // pagination mode, will scan from given entityIdPrefix!enitityId
entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(),
context.getUserId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId(), context.getEntityType(),
getFilters().getFromIdPrefix(), getFilters().getFromId());
EntityRowKey entityRowKey = null;
try {
entityRowKey =
EntityRowKey.parseRowKeyFromString(getFilters().getFromId());
} catch (IllegalArgumentException e) {
throw new BadRequestException("Invalid filter fromid is provided.");
}
if (!context.getClusterId().equals(entityRowKey.getClusterId())) {
throw new BadRequestException(
"fromid doesn't belong to clusterId=" + context.getClusterId());
}
// set start row
scan.setStartRow(entityRowKeyPrefix.getRowKeyPrefix());
scan.setStartRow(entityRowKey.getRowKey());
// get the bytes for stop row
entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(),
@ -599,6 +609,9 @@ class GenericEntityReader extends TimelineEntityReader {
if (hasField(fieldsToRetrieve, Field.METRICS)) {
readMetrics(entity, result, EntityColumnPrefix.METRIC);
}
entity.getInfo().put(TimelineReaderUtils.FROMID_KEY,
parseRowKey.getRowKeyAsString());
return entity;
}

View File

@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
@ -225,26 +224,6 @@ public class TestRowKeys {
verifyRowPrefixBytes(byteRowKeyPrefix);
}
@Test
public void testFlowActivityRowKeyAsString() {
String cluster = "cl" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR + "uster"
+ TimelineReaderUtils.DEFAULT_ESCAPE_CHAR;
String user = TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "user";
String fName = "dummy_" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR
+ TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "flow"
+ TimelineReaderUtils.DEFAULT_DELIMITER_CHAR;
Long ts = 1459900830000L;
Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
String rowKeyAsString =
new FlowActivityRowKey(cluster, ts, user, fName).getRowKeyAsString();
FlowActivityRowKey rowKey =
FlowActivityRowKey.parseRowKeyFromString(rowKeyAsString);
assertEquals(cluster, rowKey.getClusterId());
assertEquals(dayTimestamp, rowKey.getDayTimestamp());
assertEquals(user, rowKey.getUserId());
assertEquals(fName, rowKey.getFlowName());
}
@Test
public void testFlowRunRowKey() {
byte[] byteRowKey =

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
import org.junit.Test;
/**
* Test for row key as string.
*/
public class TestRowKeysAsString {
private final static String CLUSTER =
"cl" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR + "uster"
+ TimelineReaderUtils.DEFAULT_ESCAPE_CHAR;
private final static String USER =
TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "user";
private final static String FLOW_NAME =
"dummy_" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR
+ TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "flow"
+ TimelineReaderUtils.DEFAULT_DELIMITER_CHAR;
private final static Long FLOW_RUN_ID = System.currentTimeMillis();
private final static String APPLICATION_ID =
ApplicationId.newInstance(System.currentTimeMillis(), 1).toString();
@Test(timeout = 10000)
public void testApplicationRow() {
String rowKeyAsString = new ApplicationRowKey(CLUSTER, USER, FLOW_NAME,
FLOW_RUN_ID, APPLICATION_ID).getRowKeyAsString();
ApplicationRowKey rowKey =
ApplicationRowKey.parseRowKeyFromString(rowKeyAsString);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
assertEquals(APPLICATION_ID, rowKey.getAppId());
}
@Test(timeout = 10000)
public void testEntityRowKey() {
char del = TimelineReaderUtils.DEFAULT_DELIMITER_CHAR;
char esc = TimelineReaderUtils.DEFAULT_ESCAPE_CHAR;
String id = del + esc + "ent" + esc + del + "ity" + esc + del + esc + "id"
+ esc + del + esc;
String type = "entity" + esc + del + esc + "Type";
TimelineEntity entity = new TimelineEntity();
entity.setId(id);
entity.setType(type);
entity.setIdPrefix(54321);
String rowKeyAsString =
new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
entity.getType(), entity.getIdPrefix(), entity.getId())
.getRowKeyAsString();
EntityRowKey rowKey = EntityRowKey.parseRowKeyFromString(rowKeyAsString);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
assertEquals(APPLICATION_ID, rowKey.getAppId());
assertEquals(entity.getType(), rowKey.getEntityType());
assertEquals(entity.getIdPrefix(), rowKey.getEntityIdPrefix().longValue());
assertEquals(entity.getId(), rowKey.getEntityId());
}
@Test(timeout = 10000)
public void testFlowActivityRowKey() {
Long ts = 1459900830000L;
Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
String rowKeyAsString = new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME)
.getRowKeyAsString();
FlowActivityRowKey rowKey =
FlowActivityRowKey.parseRowKeyFromString(rowKeyAsString);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(dayTimestamp, rowKey.getDayTimestamp());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
}
@Test(timeout = 10000)
public void testFlowRunRowKey() {
String rowKeyAsString =
new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID)
.getRowKeyAsString();
FlowRunRowKey rowKey = FlowRunRowKey.parseRowKeyFromString(rowKeyAsString);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
}
}

View File

@ -99,19 +99,10 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyVa
* filter list, event filters can be evaluated with logical AND/OR and we can
* create a hierarchy of these {@link TimelineExistsFilter} objects. If null or
* empty, the filter is not applied.</li>
* <li><b>fromIdPrefix</b> - If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If fromIdPrefix is same
* for all entities of a given entity type, then the user must provide fromId as
* a filter to denote the start entity from which further entities will be
* fetched. fromIdPrefix is mandatory even in the case the entity id prefix is
* not used and should be set to 0.</li>
* <li><b>fromId</b> - If specified along with fromIdPrefix, retrieve entities
* with an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal to entity
* id specified in fromId. Please note than fromIdPrefix is mandatory if fromId
* is specified, otherwise, the filter will be ignored. It is recommended to
* provide both fromIdPrefix and fromId filters for more accurate results as id
* prefix may not be unique for an entity.</li>
* <li><b>fromId</b> - If specified, retrieve the next set of entities from the
* given fromId. The set of entities retrieved is inclusive of specified fromId.
* fromId should be taken from the value associated with FROM_ID info key in
* entity response which was sent earlier.</li>
* </ul>
*/
@Private
@ -126,7 +117,6 @@ public class TimelineEntityFilters {
private TimelineFilterList configFilters;
private TimelineFilterList metricFilters;
private TimelineFilterList eventFilters;
private Long fromIdPrefix;
private String fromId;
private static final long DEFAULT_BEGIN_TIME = 0L;
private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
@ -146,11 +136,10 @@ public class TimelineEntityFilters {
TimelineFilterList entityInfoFilters,
TimelineFilterList entityConfigFilters,
TimelineFilterList entityMetricFilters,
TimelineFilterList entityEventFilters, Long fromidprefix, String fromid) {
TimelineFilterList entityEventFilters, String fromid) {
this(entityLimit, timeBegin, timeEnd, entityRelatesTo, entityIsRelatedTo,
entityInfoFilters, entityConfigFilters, entityMetricFilters,
entityEventFilters);
this.fromIdPrefix = fromidprefix;
this.fromId = fromid;
}
@ -276,12 +265,4 @@ public class TimelineEntityFilters {
public void setFromId(String fromId) {
this.fromId = fromId;
}
public Long getFromIdPrefix() {
return fromIdPrefix;
}
public void setFromIdPrefix(Long fromIdPrefix) {
this.fromIdPrefix = fromIdPrefix;
}
}

View File

@ -265,20 +265,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If fromIdPrefix
* is same for all entities of a given entity type, then the user must
* provide fromId as a filter to denote the start entity from which
* further entities will be fetched. fromIdPrefix is mandatory even
* in the case the entity id prefix is not used and should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
* @param fromId If specified, retrieve the next set of entities from the
* given fromId. The set of entities retrieved is inclusive of specified
* fromId. fromId should be taken from the value associated with FROM_ID
* info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances of the given entity type
@ -310,7 +300,6 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
String url = req.getRequestURI() +
(req.getQueryString() == null ? "" :
@ -335,7 +324,7 @@ public class TimelineReaderWebServices {
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters,
fromIdPrefix, fromId),
fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
} catch (Exception e) {
@ -418,20 +407,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If fromIdPrefix
* is same for all entities of a given entity type, then the user must
* provide fromId as a filter to denote the start entity from which
* further entities will be fetched. fromIdPrefix is mandatory even
* in the case the entity id prefix is not used and should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
* @param fromId If specified, retrieve the next set of entities from the
* given fromId. The set of entities retrieved is inclusive of specified
* fromId. fromId should be taken from the value associated with FROM_ID
* info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances of the given entity type
@ -468,12 +447,11 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
return getEntities(req, res, null, appId, entityType, userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromIdPrefix,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromId);
}
@ -545,20 +523,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If fromIdPrefix
* is same for all entities of a given entity type, then the user must
* provide fromId as a filter to denote the start entity from which
* further entities will be fetched. fromIdPrefix is mandatory even
* in the case the entity id prefix is not used and should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
* @param fromId If specified, retrieve the next set of entities from the
* given fromId. The set of entities retrieved is inclusive of specified
* fromId. fromId should be taken from the value associated with FROM_ID
* info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances of the given entity type
@ -596,7 +564,6 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
String url = req.getRequestURI() +
(req.getQueryString() == null ? "" :
@ -617,7 +584,7 @@ public class TimelineReaderWebServices {
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters,
fromIdPrefix, fromId),
fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
} catch (Exception e) {
@ -1098,9 +1065,10 @@ public class TimelineReaderWebServices {
* METRICS makes sense for flow runs hence only ALL or METRICS are
* supported as fields for fetching flow runs. Other fields will lead to
* HTTP 400 (Bad Request) response. (Optional query param).
* @param fromId Defines the flow run id. If specified, retrieve the next
* set of flow runs from the given id. The set of flow runs retrieved
* is inclusive of specified fromId.
* @param fromId If specified, retrieve the next set of flow run entities
* from the given fromId. The set of entities retrieved is inclusive of
* specified fromId. fromId should be taken from the value associated
* with FROM_ID info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>FlowRunEntity</cite> instances for the given flow are
@ -1145,7 +1113,7 @@ public class TimelineReaderWebServices {
entities = timelineReaderManager.getEntities(context,
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, null, null, null,
null, null, null, null, fromId),
null, null, null, fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, fields, null));
} catch (Exception e) {
@ -1188,9 +1156,10 @@ public class TimelineReaderWebServices {
* METRICS makes sense for flow runs hence only ALL or METRICS are
* supported as fields for fetching flow runs. Other fields will lead to
* HTTP 400 (Bad Request) response. (Optional query param).
* @param fromId Defines the flow run id. If specified, retrieve the next
* set of flow runs from the given id. The set of flow runs retrieved
* is inclusive of specified fromId.
* @param fromId If specified, retrieve the next set of flow run entities
* from the given fromId. The set of entities retrieved is inclusive of
* specified fromId. fromId should be taken from the value associated
* with FROM_ID info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>FlowRunEntity</cite> instances for the given flow are
@ -1247,9 +1216,10 @@ public class TimelineReaderWebServices {
* METRICS makes sense for flow runs hence only ALL or METRICS are
* supported as fields for fetching flow runs. Other fields will lead to
* HTTP 400 (Bad Request) response. (Optional query param).
* @param fromId Defines the flow run id. If specified, retrieve the next
* set of flow runs from the given id. The set of flow runs retrieved
* is inclusive of specified fromId.
* @param fromId If specified, retrieve the next set of flow run entities
* from the given fromId. The set of entities retrieved is inclusive of
* specified fromId. fromId should be taken from the value associated
* with FROM_ID info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>FlowRunEntity</cite> instances for the given flow are
@ -1293,7 +1263,7 @@ public class TimelineReaderWebServices {
TimelineEntityType.YARN_FLOW_RUN.toString(), null, null),
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, null, null, null,
null, null, null, null, fromId),
null, null, null, fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, fields, null));
} catch (Exception e) {
@ -1423,7 +1393,7 @@ public class TimelineReaderWebServices {
DateRange range = parseDateRange(dateRange);
TimelineEntityFilters entityFilters =
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, null, null, null, null, null, null, null, null, null,
limit, null, null, null, null, null, null, null, null,
fromId);
entityFilters.setCreatedTimeBegin(range.dateStart);
entityFilters.setCreatedTimeEnd(range.dateEnd);
@ -1744,9 +1714,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromId Defines the application id. If specified, retrieve the next
* set of applications from the given id. The set of applications
* retrieved is inclusive of specified fromId.
* @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated
* with FROM_ID info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances representing apps is
@ -1799,7 +1770,7 @@ public class TimelineReaderWebServices {
entities = timelineReaderManager.getEntities(context,
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters, null,
infofilters, conffilters, metricfilters, eventfilters,
fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
@ -1876,9 +1847,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromId Defines the application id. If specified, retrieve the next
* set of applications from the given id. The set of applications
* retrieved is inclusive of specified fromId.
* @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated
* with FROM_ID info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances representing apps is
@ -1916,7 +1888,7 @@ public class TimelineReaderWebServices {
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId);
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromId);
}
/**
@ -1980,9 +1952,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromId Defines the application id. If specified, retrieve the next
* set of applications from the given id. The set of applications
* retrieved is inclusive of specified fromId.
* @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated
* with FROM_ID info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances representing apps is
@ -2022,7 +1995,7 @@ public class TimelineReaderWebServices {
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId);
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromId);
}
/**
@ -2083,9 +2056,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromId Defines the application id. If specified, retrieve the next
* set of applications from the given id. The set of applications
* retrieved is inclusive of specified fromId.
* @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated
* with FROM_ID info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances representing apps is
@ -2122,7 +2096,7 @@ public class TimelineReaderWebServices {
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId);
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromId);
}
/**
@ -2184,9 +2158,10 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromId Defines the application id. If specified, retrieve the next
* set of applications from the given id. The set of applications
* retrieved is inclusive of specified fromId.
* @param fromId If specified, retrieve the next set of applications
* from the given fromId. The set of applications retrieved is inclusive
* of specified fromId. fromId should be taken from the value associated
* with FROM_ID info key in entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances representing apps is
@ -2224,7 +2199,7 @@ public class TimelineReaderWebServices {
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId);
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromId);
}
/**
@ -2295,21 +2270,11 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional
* query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If
* fromIdPrefix is same for all entities of a given entity type, then
* the user must provide fromId as a filter to denote the start
* entity from which further entities will be fetched. fromIdPrefix
* is mandatory even in the case the entity id prefix is not used and
* should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
* @param fromId If specified, retrieve the next set of application-attempt
* entities from the given fromId. The set of application-attempt
* entities retrieved is inclusive of specified fromId. fromId should
* be taken from the value associated with FROM_ID info key in
* entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>TimelineEntity</cite> instances of the app-attempt
@ -2343,13 +2308,12 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
return getAppAttempts(req, res, null, appId, userId, flowName, flowRunId,
limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters, confsToRetrieve,
metricsToRetrieve, fields, metricsLimit, fromIdPrefix, fromId);
metricsToRetrieve, fields, metricsLimit, fromId);
}
/**
@ -2421,21 +2385,11 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional
* query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If
* fromIdPrefix is same for all entities of a given entity type, then
* the user must provide fromId as a filter to denote the start
* entity from which further entities will be fetched. fromIdPrefix
* is mandatory even in the case the entity id prefix is not used and
* should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
* @param fromId If specified, retrieve the next set of application-attempt
* entities from the given fromId. The set of application-attempt
* entities retrieved is inclusive of specified fromId. fromId should
* be taken from the value associated with FROM_ID info key in
* entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>TimelineEntity</cite> instances of the app-attempts
@ -2470,7 +2424,6 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
return getEntities(req, res, clusterId, appId,
@ -2478,7 +2431,7 @@ public class TimelineReaderWebServices {
flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromIdPrefix, fromId);
fromId);
}
/**
@ -2700,21 +2653,11 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional
* query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If
* fromIdPrefix is same for all entities of a given entity type, then
* the user must provide fromId as a filter to denote the start
* entity from which further entities will be fetched. fromIdPrefix
* is mandatory even in the case the entity id prefix is not used and
* should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
* @param fromId If specified, retrieve the next set of container
* entities from the given fromId. The set of container
* entities retrieved is inclusive of specified fromId. fromId should
* be taken from the value associated with FROM_ID info key in
* entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>TimelineEntity</cite> instances of the containers
@ -2749,12 +2692,11 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
return getContainers(req, res, null, appId, appattemptId, userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromIdPrefix,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromId);
}
@ -2829,21 +2771,11 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional
* query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If
* fromIdPrefix is same for all entities of a given entity type, then
* the user must provide fromId as a filter to denote the start
* entity from which further entities will be fetched. fromIdPrefix
* is mandatory even in the case the entity id prefix is not used and
* should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
* @param fromId If specified, retrieve the next set of container
* entities from the given fromId. The set of container
* entities retrieved is inclusive of specified fromId. fromId should
* be taken from the value associated with FROM_ID info key in
* entity response which was sent earlier.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>TimelineEntity</cite> instances of the containers
@ -2880,7 +2812,6 @@ public class TimelineReaderWebServices {
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
String entityType = TimelineEntityType.YARN_CONTAINER.toString();
@ -2899,7 +2830,7 @@ public class TimelineReaderWebServices {
return getEntities(req, res, clusterId, appId, entityType, userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilter, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromIdPrefix,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromId);
}

View File

@ -74,14 +74,14 @@ final class TimelineReaderWebServicesUtils {
static TimelineEntityFilters createTimelineEntityFilters(String limit,
String createdTimeStart, String createdTimeEnd, String relatesTo,
String isRelatedTo, String infofilters, String conffilters,
String metricfilters, String eventfilters, String fromidprefix,
String metricfilters, String eventfilters,
String fromid) throws TimelineParseException {
return new TimelineEntityFilters(parseLongStr(limit),
parseLongStr(createdTimeStart), parseLongStr(createdTimeEnd),
parseRelationFilters(relatesTo), parseRelationFilters(isRelatedTo),
parseKVFilters(infofilters, false), parseKVFilters(conffilters, true),
parseMetricFilters(metricfilters), parseEventFilters(eventfilters),
parseLongStr(fromidprefix), parseStr(fromid));
parseStr(fromid));
}
/**