YARN-4237 Support additional queries for ATSv2 Web UI. Contributed by

Varun Saxena.
This commit is contained in:
Li Lu 2015-10-15 10:49:36 -07:00 committed by Sangjin Lee
parent b51d0fef56
commit d014f2ffd2
6 changed files with 236 additions and 11 deletions

View File

@ -449,6 +449,70 @@ public class TimelineReaderWebServices {
return entity; return entity;
} }
/**
* Return a set of flows runs for the given flow id.
* Cluster ID is not provided by client so default cluster ID has to be taken.
*/
@GET
@Path("/flowruns/{flowid}/")
@Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getFlowRuns(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam("flowid") String flowId,
@QueryParam("userid") String userId,
@QueryParam("limit") String limit,
@QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd,
@QueryParam("fields") String fields) {
return getFlowRuns(req, res, null, flowId, userId, limit, createdTimeStart,
createdTimeEnd, fields);
}
/**
* Return a set of flow runs for the given cluster and flow id.
*/
@GET
@Path("/flowruns/{clusterid}/{flowid}/")
@Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getFlowRuns(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam("clusterid") String clusterId,
@PathParam("flowid") String flowId,
@QueryParam("userid") String userId,
@QueryParam("limit") String limit,
@QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd,
@QueryParam("fields") String fields) {
String url = req.getRequestURI() +
(req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString());
UserGroupInformation callerUGI = getUser(req);
LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
long startTime = Time.monotonicNow();
init(res);
TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
Set<TimelineEntity> entities = null;
try {
entities = timelineReaderManager.getEntities(
parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId),
null, null, TimelineEntityType.YARN_FLOW_RUN.toString(),
parseLongStr(limit), parseLongStr(createdTimeStart),
parseLongStr(createdTimeEnd), null, null, null, null, null, null,
null, null, parseFieldsStr(fields, COMMA_DELIMITER));
} catch (Exception e) {
handleException(e, url, startTime, "createdTime start/end or limit");
}
long endTime = Time.monotonicNow();
if (entities == null) {
entities = Collections.emptySet();
}
LOG.info("Processed URL " + url +
" (Took " + (endTime - startTime) + " ms.)");
return entities;
}
/** /**
* Return a list of flows for a given cluster id. Cluster ID is not * Return a list of flows for a given cluster id. Cluster ID is not
* provided by client so default cluster ID has to be taken. * provided by client so default cluster ID has to be taken.

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
@ -56,7 +58,7 @@ class FlowRunEntityReader extends TimelineEntityReader {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
eventFilters, fieldsToRetrieve, false); eventFilters, fieldsToRetrieve, true);
} }
public FlowRunEntityReader(String userId, String clusterId, public FlowRunEntityReader(String userId, String clusterId,
@ -79,11 +81,27 @@ class FlowRunEntityReader extends TimelineEntityReader {
Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
Preconditions.checkNotNull(userId, "userId shouldn't be null"); Preconditions.checkNotNull(userId, "userId shouldn't be null");
Preconditions.checkNotNull(flowId, "flowId shouldn't be null"); Preconditions.checkNotNull(flowId, "flowId shouldn't be null");
if (singleEntityRead) {
Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null"); Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null");
} }
}
@Override @Override
protected void augmentParams(Configuration hbaseConf, Connection conn) { protected void augmentParams(Configuration hbaseConf, Connection conn) {
if (!singleEntityRead) {
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.noneOf(Field.class);
}
if (limit == null || limit < 0) {
limit = TimelineReader.DEFAULT_LIMIT;
}
if (createdTimeBegin == null) {
createdTimeBegin = DEFAULT_BEGIN_TIME;
}
if (createdTimeEnd == null) {
createdTimeEnd = DEFAULT_END_TIME;
}
}
} }
@Override @Override
@ -99,8 +117,11 @@ class FlowRunEntityReader extends TimelineEntityReader {
@Override @Override
protected ResultScanner getResults(Configuration hbaseConf, protected ResultScanner getResults(Configuration hbaseConf,
Connection conn) throws IOException { Connection conn) throws IOException {
throw new UnsupportedOperationException( Scan scan = new Scan();
"multiple entity query is not supported"); scan.setRowPrefixFilter(
FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowId));
scan.setFilter(new PageFilter(limit));
return table.getResultScanner(hbaseConf, conn, scan);
} }
@Override @Override
@ -108,13 +129,23 @@ class FlowRunEntityReader extends TimelineEntityReader {
FlowRunEntity flowRun = new FlowRunEntity(); FlowRunEntity flowRun = new FlowRunEntity();
flowRun.setUser(userId); flowRun.setUser(userId);
flowRun.setName(flowId); flowRun.setName(flowId);
if (singleEntityRead) {
flowRun.setRunId(flowRunId); flowRun.setRunId(flowRunId);
} else {
FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow());
flowRun.setRunId(rowKey.getFlowRunId());
}
// read the start time // read the start time
Number startTime = (Number)FlowRunColumn.MIN_START_TIME.readResult(result); Number startTime = (Number)FlowRunColumn.MIN_START_TIME.readResult(result);
if (startTime != null) { if (startTime != null) {
flowRun.setStartTime(startTime.longValue()); flowRun.setStartTime(startTime.longValue());
} }
if (!singleEntityRead && (flowRun.getStartTime() < createdTimeBegin ||
flowRun.getStartTime() > createdTimeEnd)) {
return null;
}
// read the end time if available // read the end time if available
Number endTime = (Number)FlowRunColumn.MAX_END_TIME.readResult(result); Number endTime = (Number)FlowRunColumn.MAX_END_TIME.readResult(result);
if (endTime != null) { if (endTime != null) {
@ -128,7 +159,9 @@ class FlowRunEntityReader extends TimelineEntityReader {
} }
// read metrics // read metrics
if (singleEntityRead || fieldsToRetrieve.contains(Field.METRICS)) {
readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC);
}
// set the id // set the id
flowRun.setId(flowRun.getId()); flowRun.setId(flowRun.getId());

View File

@ -60,9 +60,6 @@ class GenericEntityReader extends TimelineEntityReader {
private static final EntityTable ENTITY_TABLE = new EntityTable(); private static final EntityTable ENTITY_TABLE = new EntityTable();
private static final Log LOG = LogFactory.getLog(GenericEntityReader.class); private static final Log LOG = LogFactory.getLog(GenericEntityReader.class);
protected static final long DEFAULT_BEGIN_TIME = 0L;
protected static final long DEFAULT_END_TIME = Long.MAX_VALUE;
/** /**
* Used to look up the flow context. * Used to look up the flow context.
*/ */

View File

@ -44,6 +44,9 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
*/ */
abstract class TimelineEntityReader { abstract class TimelineEntityReader {
private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class); private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class);
protected static final long DEFAULT_BEGIN_TIME = 0L;
protected static final long DEFAULT_END_TIME = Long.MAX_VALUE;
protected final boolean singleEntityRead; protected final boolean singleEntityRead;
protected String userId; protected String userId;

View File

@ -54,6 +54,21 @@ public class FlowRunRowKey {
return flowRunId; return flowRunId;
} }
/**
* Constructs a row key prefix for the flow run table as follows: {
* clusterId!userI!flowId!}
*
* @param clusterId
* @param userId
* @param flowId
* @return byte array with the row key prefix
*/
public static byte[] getRowKeyPrefix(String clusterId, String userId,
String flowId) {
return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId,
flowId, ""));
}
/** /**
* Constructs a row key for the entity table as follows: { * Constructs a row key for the entity table as follows: {
* clusterId!userI!flowId!Inverted Flow Run Id} * clusterId!userI!flowId!Inverted Flow Run Id}

View File

@ -172,11 +172,11 @@ public class TestTimelineReaderWebServicesHBaseStorage {
id = "application_11111111111111_2223"; id = "application_11111111111111_2223";
entity3.setId(id); entity3.setId(id);
entity3.setType(type); entity3.setType(type);
cTime = 1425016501030L; cTime = 1425016501037L;
entity3.setCreatedTime(cTime); entity3.setCreatedTime(cTime);
TimelineEvent event2 = new TimelineEvent(); TimelineEvent event2 = new TimelineEvent();
event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
event2.setTimestamp(1436512802030L); event2.setTimestamp(1436512802037L);
event2.addInfo("foo_event", "test"); event2.addInfo("foo_event", "test");
entity3.addEvent(event2); entity3.addEvent(event2);
te3.addEntity(entity3); te3.addEntity(entity3);
@ -364,6 +364,119 @@ public class TestTimelineReaderWebServicesHBaseStorage {
} }
} }
@Test
public void testGetFlowRuns() throws Exception {
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowruns/cluster1/flow_name?userid=user1");
ClientResponse resp = getResponse(client, uri);
Set<FlowRunEntity> entities =
resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
assertNotNull(entities);
assertEquals(2, entities.size());
for (FlowRunEntity entity : entities) {
assertTrue("Id, run id or start time does not match.",
((entity.getId().equals("user1@flow_name/1002345678919")) &&
(entity.getRunId() == 1002345678919L) &&
(entity.getStartTime() == 1425016501000L)) ||
((entity.getId().equals("user1@flow_name/1002345678920")) &&
(entity.getRunId() == 1002345678920L) &&
(entity.getStartTime() == 1425016501034L)));
assertEquals(0, entity.getMetrics().size());
}
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowruns/cluster1/flow_name?userid=user1&limit=1");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
assertNotNull(entities);
assertEquals(1, entities.size());
for (FlowRunEntity entity : entities) {
assertTrue("Id, run id or start time does not match.",
entity.getId().equals("user1@flow_name/1002345678920") &&
entity.getRunId() == 1002345678920L &&
entity.getStartTime() == 1425016501034L);
assertEquals(0, entity.getMetrics().size());
}
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowruns/cluster1/flow_name?userid=user1&" +
"createdtimestart=1425016501030");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
assertNotNull(entities);
assertEquals(1, entities.size());
for (FlowRunEntity entity : entities) {
assertTrue("Id, run id or start time does not match.",
entity.getId().equals("user1@flow_name/1002345678920") &&
entity.getRunId() == 1002345678920L &&
entity.getStartTime() == 1425016501034L);
assertEquals(0, entity.getMetrics().size());
}
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowruns/cluster1/flow_name?userid=user1&" +
"createdtimestart=1425016500999&createdtimeend=1425016501035");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
assertNotNull(entities);
assertEquals(2, entities.size());
for (FlowRunEntity entity : entities) {
assertTrue("Id, run id or start time does not match.",
((entity.getId().equals("user1@flow_name/1002345678919")) &&
(entity.getRunId() == 1002345678919L) &&
(entity.getStartTime() == 1425016501000L)) ||
((entity.getId().equals("user1@flow_name/1002345678920")) &&
(entity.getRunId() == 1002345678920L) &&
(entity.getStartTime() == 1425016501034L)));
assertEquals(0, entity.getMetrics().size());
}
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowruns/cluster1/flow_name?userid=user1&" +
"createdtimeend=1425016501030");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
assertNotNull(entities);
assertEquals(1, entities.size());
for (FlowRunEntity entity : entities) {
assertTrue("Id, run id or start time does not match.",
entity.getId().equals("user1@flow_name/1002345678919") &&
entity.getRunId() == 1002345678919L &&
entity.getStartTime() == 1425016501000L);
assertEquals(0, entity.getMetrics().size());
}
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowruns/cluster1/flow_name?userid=user1&fields=metrics");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
assertNotNull(entities);
assertEquals(2, entities.size());
for (FlowRunEntity entity : entities) {
assertTrue("Id, run id or start time does not match.",
((entity.getId().equals("user1@flow_name/1002345678919")) &&
(entity.getRunId() == 1002345678919L) &&
(entity.getStartTime() == 1425016501000L) &&
(entity.getMetrics().size() == 2)) ||
((entity.getId().equals("user1@flow_name/1002345678920")) &&
(entity.getRunId() == 1002345678920L) &&
(entity.getStartTime() == 1425016501034L) &&
(entity.getMetrics().size() == 0)));
}
} finally {
client.destroy();
}
}
@Test @Test
public void testGetFlows() throws Exception { public void testGetFlows() throws Exception {
Client client = createClient(); Client client = createClient();