YARN-4221. Store user in app to flow table (Varun Saxena via sjlee)

This commit is contained in:
Sangjin Lee 2015-10-23 22:07:00 -07:00
parent 10ec5586fb
commit 09649005ca
7 changed files with 145 additions and 91 deletions

View File

@ -265,11 +265,6 @@ public class TimelineReaderWebServices {
return str == null ? null : str.trim(); return str == null ? null : str.trim();
} }
private static String parseUser(UserGroupInformation callerUGI, String user) {
return (callerUGI != null && (user == null || user.isEmpty()) ?
callerUGI.getUserName().trim() : parseStr(user));
}
private static UserGroupInformation getUser(HttpServletRequest req) { private static UserGroupInformation getUser(HttpServletRequest req) {
String remoteUser = req.getRemoteUser(); String remoteUser = req.getRemoteUser();
UserGroupInformation callerUGI = null; UserGroupInformation callerUGI = null;
@ -389,7 +384,7 @@ public class TimelineReaderWebServices {
Set<TimelineEntity> entities = null; Set<TimelineEntity> entities = null;
try { try {
entities = timelineReaderManager.getEntities( entities = timelineReaderManager.getEntities(
parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId), parseStr(userId), parseStr(clusterId), parseStr(flowId),
parseLongStr(flowRunId), parseStr(appId), parseStr(entityType), parseLongStr(flowRunId), parseStr(appId), parseStr(entityType),
parseLongStr(limit), parseLongStr(createdTimeStart), parseLongStr(limit), parseLongStr(createdTimeStart),
parseLongStr(createdTimeEnd), parseLongStr(modifiedTimeStart), parseLongStr(createdTimeEnd), parseLongStr(modifiedTimeStart),
@ -463,7 +458,7 @@ public class TimelineReaderWebServices {
TimelineEntity entity = null; TimelineEntity entity = null;
try { try {
entity = timelineReaderManager.getEntity( entity = timelineReaderManager.getEntity(
parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId), parseStr(userId), parseStr(clusterId), parseStr(flowId),
parseLongStr(flowRunId), parseStr(appId), parseStr(entityType), parseLongStr(flowRunId), parseStr(appId), parseStr(entityType),
parseStr(entityId), parseFieldsStr(fields, COMMA_DELIMITER)); parseStr(entityId), parseFieldsStr(fields, COMMA_DELIMITER));
} catch (Exception e) { } catch (Exception e) {
@ -482,35 +477,35 @@ public class TimelineReaderWebServices {
} }
/** /**
* Return a single flow run for the given cluster, flow id and run id. * Return a single flow run for the given user, flow id and run id.
* Cluster ID is not provided by client so default cluster ID has to be taken. * Cluster ID is not provided by client so default cluster ID has to be taken.
*/ */
@GET @GET
@Path("/flowrun/{flowid}/{flowrunid}/") @Path("/flowrun/{userid}/{flowid}/{flowrunid}/")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public TimelineEntity getFlowRun( public TimelineEntity getFlowRun(
@Context HttpServletRequest req, @Context HttpServletRequest req,
@Context HttpServletResponse res, @Context HttpServletResponse res,
@PathParam("userid") String userId,
@PathParam("flowid") String flowId, @PathParam("flowid") String flowId,
@PathParam("flowrunid") String flowRunId, @PathParam("flowrunid") String flowRunId,
@QueryParam("userid") String userId,
@QueryParam("fields") String fields) { @QueryParam("fields") String fields) {
return getFlowRun(req, res, null, flowId, flowRunId, userId, fields); return getFlowRun(req, res, userId, null, flowId, flowRunId, fields);
} }
/** /**
* Return a single flow run for the given cluster, flow id and run id. * Return a single flow run for the given user, cluster, flow id and run id.
*/ */
@GET @GET
@Path("/flowrun/{clusterid}/{flowid}/{flowrunid}/") @Path("/flowrun/{userid}/{clusterid}/{flowid}/{flowrunid}/")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public TimelineEntity getFlowRun( public TimelineEntity getFlowRun(
@Context HttpServletRequest req, @Context HttpServletRequest req,
@Context HttpServletResponse res, @Context HttpServletResponse res,
@PathParam("userid") String userId,
@PathParam("clusterid") String clusterId, @PathParam("clusterid") String clusterId,
@PathParam("flowid") String flowId, @PathParam("flowid") String flowId,
@PathParam("flowrunid") String flowRunId, @PathParam("flowrunid") String flowRunId,
@QueryParam("userid") String userId,
@QueryParam("fields") String fields) { @QueryParam("fields") String fields) {
String url = req.getRequestURI() + String url = req.getRequestURI() +
(req.getQueryString() == null ? "" : (req.getQueryString() == null ? "" :
@ -522,9 +517,8 @@ public class TimelineReaderWebServices {
TimelineReaderManager timelineReaderManager = getTimelineReaderManager(); TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
TimelineEntity entity = null; TimelineEntity entity = null;
try { try {
entity = timelineReaderManager.getEntity( entity = timelineReaderManager.getEntity(parseStr(userId),
parseUser(callerUGI, userId), parseStr(clusterId), parseStr(clusterId), parseStr(flowId), parseLongStr(flowRunId), null,
parseStr(flowId), parseLongStr(flowRunId), null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null, TimelineEntityType.YARN_FLOW_RUN.toString(), null,
parseFieldsStr(fields, COMMA_DELIMITER)); parseFieldsStr(fields, COMMA_DELIMITER));
} catch (Exception e) { } catch (Exception e) {
@ -543,37 +537,37 @@ public class TimelineReaderWebServices {
} }
/** /**
* Return a set of flows runs for the given flow id. * Return a set of flows runs for the given user and flow id.
* Cluster ID is not provided by client so default cluster ID has to be taken. * Cluster ID is not provided by client so default cluster ID has to be taken.
*/ */
@GET @GET
@Path("/flowruns/{flowid}/") @Path("/flowruns/{userid}/{flowid}/")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getFlowRuns( public Set<TimelineEntity> getFlowRuns(
@Context HttpServletRequest req, @Context HttpServletRequest req,
@Context HttpServletResponse res, @Context HttpServletResponse res,
@PathParam("userid") String userId,
@PathParam("flowid") String flowId, @PathParam("flowid") String flowId,
@QueryParam("userid") String userId,
@QueryParam("limit") String limit, @QueryParam("limit") String limit,
@QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd, @QueryParam("createdtimeend") String createdTimeEnd,
@QueryParam("fields") String fields) { @QueryParam("fields") String fields) {
return getFlowRuns(req, res, null, flowId, userId, limit, createdTimeStart, return getFlowRuns(req, res, userId, null, flowId, limit, createdTimeStart,
createdTimeEnd, fields); createdTimeEnd, fields);
} }
/** /**
* Return a set of flow runs for the given cluster and flow id. * Return a set of flow runs for the given user, cluster and flow id.
*/ */
@GET @GET
@Path("/flowruns/{clusterid}/{flowid}/") @Path("/flowruns/{userid}/{clusterid}/{flowid}/")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getFlowRuns( public Set<TimelineEntity> getFlowRuns(
@Context HttpServletRequest req, @Context HttpServletRequest req,
@Context HttpServletResponse res, @Context HttpServletResponse res,
@PathParam("userid") String userId,
@PathParam("clusterid") String clusterId, @PathParam("clusterid") String clusterId,
@PathParam("flowid") String flowId, @PathParam("flowid") String flowId,
@QueryParam("userid") String userId,
@QueryParam("limit") String limit, @QueryParam("limit") String limit,
@QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd, @QueryParam("createdtimeend") String createdTimeEnd,
@ -589,11 +583,11 @@ public class TimelineReaderWebServices {
Set<TimelineEntity> entities = null; Set<TimelineEntity> entities = null;
try { try {
entities = timelineReaderManager.getEntities( entities = timelineReaderManager.getEntities(
parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId), parseStr(userId), parseStr(clusterId), parseStr(flowId), null, null,
null, null, TimelineEntityType.YARN_FLOW_RUN.toString(), TimelineEntityType.YARN_FLOW_RUN.toString(), parseLongStr(limit),
parseLongStr(limit), parseLongStr(createdTimeStart), parseLongStr(createdTimeStart), parseLongStr(createdTimeEnd), null,
parseLongStr(createdTimeEnd), null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, parseFieldsStr(fields, COMMA_DELIMITER)); parseFieldsStr(fields, COMMA_DELIMITER));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, "createdTime start/end or limit"); handleException(e, url, startTime, "createdTime start/end or limit");
} }
@ -730,10 +724,9 @@ public class TimelineReaderWebServices {
TimelineReaderManager timelineReaderManager = getTimelineReaderManager(); TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
TimelineEntity entity = null; TimelineEntity entity = null;
try { try {
entity = timelineReaderManager.getEntity( entity = timelineReaderManager.getEntity(parseStr(userId),
parseUser(callerUGI, userId), parseStr(clusterId), parseStr(clusterId), parseStr(flowId), parseLongStr(flowRunId),
parseStr(flowId), parseLongStr(flowRunId), parseStr(appId), parseStr(appId), TimelineEntityType.YARN_APPLICATION.toString(), null,
TimelineEntityType.YARN_APPLICATION.toString(), null,
parseFieldsStr(fields, COMMA_DELIMITER)); parseFieldsStr(fields, COMMA_DELIMITER));
} catch (Exception e) { } catch (Exception e) {
handleException(e, url, startTime, "flowrunid"); handleException(e, url, startTime, "flowrunid");
@ -750,20 +743,20 @@ public class TimelineReaderWebServices {
} }
/** /**
* Return a list of apps for given flow id and flow run id. Cluster ID is not * Return a list of apps for given user, flow id and flow run id. Cluster ID
* provided by client so default cluster ID has to be taken. If number of * is not provided by client so default cluster ID has to be taken. If number
* matching apps are more than the limit, most recent apps till the limit is * of matching apps are more than the limit, most recent apps till the limit
* reached, will be returned. * is reached, will be returned.
*/ */
@GET @GET
@Path("/flowrunapps/{flowid}/{flowrunid}/") @Path("/flowrunapps/{userid}/{flowid}/{flowrunid}/")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getFlowRunApps( public Set<TimelineEntity> getFlowRunApps(
@Context HttpServletRequest req, @Context HttpServletRequest req,
@Context HttpServletResponse res, @Context HttpServletResponse res,
@PathParam("userid") String userId,
@PathParam("flowid") String flowId, @PathParam("flowid") String flowId,
@PathParam("flowrunid") String flowRunId, @PathParam("flowrunid") String flowRunId,
@QueryParam("userid") String userId,
@QueryParam("limit") String limit, @QueryParam("limit") String limit,
@QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd, @QueryParam("createdtimeend") String createdTimeEnd,
@ -784,20 +777,20 @@ public class TimelineReaderWebServices {
} }
/** /**
* Return a list of apps for a given cluster id, flow id and flow run id. If * Return a list of apps for a given user, cluster id, flow id and flow run
* number of matching apps are more than the limit, most recent apps till the * id. If number of matching apps are more than the limit, most recent apps
* limit is reached, will be returned. * till the limit is reached, will be returned.
*/ */
@GET @GET
@Path("/flowrunapps/{clusterid}/{flowid}/{flowrunid}/") @Path("/flowrunapps/{userid}/{clusterid}/{flowid}/{flowrunid}/")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getFlowRunApps( public Set<TimelineEntity> getFlowRunApps(
@Context HttpServletRequest req, @Context HttpServletRequest req,
@Context HttpServletResponse res, @Context HttpServletResponse res,
@PathParam("userid") String userId,
@PathParam("clusterid") String clusterId, @PathParam("clusterid") String clusterId,
@PathParam("flowid") String flowId, @PathParam("flowid") String flowId,
@PathParam("flowrunid") String flowRunId, @PathParam("flowrunid") String flowRunId,
@QueryParam("userid") String userId,
@QueryParam("limit") String limit, @QueryParam("limit") String limit,
@QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd, @QueryParam("createdtimeend") String createdTimeEnd,
@ -818,19 +811,19 @@ public class TimelineReaderWebServices {
} }
/** /**
* Return a list of apps for given flow id. Cluster ID is not provided by * Return a list of apps for given user and flow id. Cluster ID is not
* client so default cluster ID has to be taken. If number of matching apps * provided by client so default cluster ID has to be taken. If number of
* are more than the limit, most recent apps till the limit is reached, will * matching apps are more than the limit, most recent apps till the limit is
* be returned. * reached, will be returned.
*/ */
@GET @GET
@Path("/flowapps/{flowid}/") @Path("/flowapps/{userid}/{flowid}/")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getFlowApps( public Set<TimelineEntity> getFlowApps(
@Context HttpServletRequest req, @Context HttpServletRequest req,
@Context HttpServletResponse res, @Context HttpServletResponse res,
@PathParam("userid") String userId,
@PathParam("flowid") String flowId, @PathParam("flowid") String flowId,
@QueryParam("userid") String userId,
@QueryParam("limit") String limit, @QueryParam("limit") String limit,
@QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd, @QueryParam("createdtimeend") String createdTimeEnd,
@ -851,19 +844,19 @@ public class TimelineReaderWebServices {
} }
/** /**
* Return a list of apps for a given cluster id and flow id. If number of * Return a list of apps for a given user, cluster id and flow id. If number
* matching apps are more than the limit, most recent apps till the limit is * of matching apps are more than the limit, most recent apps till the limit
* reached, will be returned. * is reached, will be returned.
*/ */
@GET @GET
@Path("/flowapps/{clusterid}/{flowid}/") @Path("/flowapps/{userid}/{clusterid}/{flowid}/")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getFlowApps( public Set<TimelineEntity> getFlowApps(
@Context HttpServletRequest req, @Context HttpServletRequest req,
@Context HttpServletResponse res, @Context HttpServletResponse res,
@PathParam("userid") String userId,
@PathParam("clusterid") String clusterId, @PathParam("clusterid") String clusterId,
@PathParam("flowid") String flowId, @PathParam("flowid") String flowId,
@QueryParam("userid") String userId,
@QueryParam("limit") String limit, @QueryParam("limit") String limit,
@QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd, @QueryParam("createdtimeend") String createdTimeEnd,

View File

@ -90,12 +90,12 @@ class ApplicationEntityReader extends GenericEntityReader {
@Override @Override
protected void validateParams() { protected void validateParams() {
Preconditions.checkNotNull(userId, "userId shouldn't be null");
Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
if (singleEntityRead) { if (singleEntityRead) {
Preconditions.checkNotNull(appId, "appId shouldn't be null"); Preconditions.checkNotNull(appId, "appId shouldn't be null");
} else { } else {
Preconditions.checkNotNull(userId, "userId shouldn't be null");
Preconditions.checkNotNull(flowId, "flowId shouldn't be null"); Preconditions.checkNotNull(flowId, "flowId shouldn't be null");
} }
} }
@ -104,11 +104,12 @@ class ApplicationEntityReader extends GenericEntityReader {
protected void augmentParams(Configuration hbaseConf, Connection conn) protected void augmentParams(Configuration hbaseConf, Connection conn)
throws IOException { throws IOException {
if (singleEntityRead) { if (singleEntityRead) {
if (flowId == null || flowRunId == null) { if (flowId == null || flowRunId == null || userId == null) {
FlowContext context = FlowContext context =
lookupFlowContext(clusterId, appId, hbaseConf, conn); lookupFlowContext(clusterId, appId, hbaseConf, conn);
flowId = context.flowId; flowId = context.flowId;
flowRunId = context.flowRunId; flowRunId = context.flowRunId;
userId = context.userId;
} }
} }
if (fieldsToRetrieve == null) { if (fieldsToRetrieve == null) {

View File

@ -100,6 +100,7 @@ class GenericEntityReader extends TimelineEntityReader {
Result result = appToFlowTable.getResult(hbaseConf, conn, get); Result result = appToFlowTable.getResult(hbaseConf, conn, get);
if (result != null && !result.isEmpty()) { if (result != null && !result.isEmpty()) {
return new FlowContext( return new FlowContext(
AppToFlowColumn.USER_ID.readResult(result).toString(),
AppToFlowColumn.FLOW_ID.readResult(result).toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(),
((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
} else { } else {
@ -110,9 +111,11 @@ class GenericEntityReader extends TimelineEntityReader {
} }
protected static class FlowContext { protected static class FlowContext {
protected final String userId;
protected final String flowId; protected final String flowId;
protected final Long flowRunId; protected final Long flowRunId;
public FlowContext(String flowId, Long flowRunId) { public FlowContext(String user, String flowId, Long flowRunId) {
this.userId = user;
this.flowId = flowId; this.flowId = flowId;
this.flowRunId = flowRunId; this.flowRunId = flowRunId;
} }
@ -120,7 +123,6 @@ class GenericEntityReader extends TimelineEntityReader {
@Override @Override
protected void validateParams() { protected void validateParams() {
Preconditions.checkNotNull(userId, "userId shouldn't be null");
Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
Preconditions.checkNotNull(appId, "appId shouldn't be null"); Preconditions.checkNotNull(appId, "appId shouldn't be null");
Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
@ -132,12 +134,13 @@ class GenericEntityReader extends TimelineEntityReader {
@Override @Override
protected void augmentParams(Configuration hbaseConf, Connection conn) protected void augmentParams(Configuration hbaseConf, Connection conn)
throws IOException { throws IOException {
// In reality both should be null or neither should be null // In reality all three should be null or neither should be null
if (flowId == null || flowRunId == null) { if (flowId == null || flowRunId == null || userId == null) {
FlowContext context = FlowContext context =
lookupFlowContext(clusterId, appId, hbaseConf, conn); lookupFlowContext(clusterId, appId, hbaseConf, conn);
flowId = context.flowId; flowId = context.flowId;
flowRunId = context.flowRunId; flowRunId = context.flowRunId;
userId = context.userId;
} }
if (fieldsToRetrieve == null) { if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.noneOf(Field.class); fieldsToRetrieve = EnumSet.noneOf(Field.class);

View File

@ -162,8 +162,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
String flowName, String flowVersion, long flowRunId, String appId, String flowName, String flowVersion, long flowRunId, String appId,
TimelineEntity te) throws IOException { TimelineEntity te) throws IOException {
// store in App to flow table // store in App to flow table
storeInAppToFlowTable(clusterId, userId, flowName, flowVersion, flowRunId, storeInAppToFlowTable(clusterId, userId, flowName, flowRunId, appId, te);
appId, te);
// store in flow run table // store in flow run table
storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion, storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion,
flowRunId, appId, te); flowRunId, appId, te);
@ -200,11 +199,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
} }
private void storeInAppToFlowTable(String clusterId, String userId, private void storeInAppToFlowTable(String clusterId, String userId,
String flowName, String flowVersion, long flowRunId, String appId, String flowName, long flowRunId, String appId, TimelineEntity te)
TimelineEntity te) throws IOException { throws IOException {
byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName); AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId); AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId);
AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId);
} }
/* /*

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBuffere
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
/** /**
* Identifies fully qualified columns for the {@link AppToFlowTable}. * Identifies fully qualified columns for the {@link AppToFlowTable}.
@ -43,7 +42,12 @@ public enum AppToFlowColumn implements Column<AppToFlowTable> {
/** /**
* The flow run ID * The flow run ID
*/ */
FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id"); FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id"),
/**
* The user
*/
USER_ID(AppToFlowColumnFamily.MAPPING, "user_id");
private final ColumnHelper<AppToFlowTable> column; private final ColumnHelper<AppToFlowTable> column;
private final ColumnFamily<AppToFlowTable> columnFamily; private final ColumnFamily<AppToFlowTable> columnFamily;

View File

@ -49,6 +49,9 @@ import java.io.IOException;
* | | flowRunId: | * | | flowRunId: |
* | | 1452828720457 | * | | 1452828720457 |
* | | | * | | |
* | | user_id: |
* | | admin |
* | | |
* | | | * | | |
* | | | * | | |
* |--------------------------------------| * |--------------------------------------|

View File

@ -199,6 +199,18 @@ public class TestTimelineReaderWebServicesHBaseStorage {
entity4.addEvent(event4); entity4.addEvent(event4);
te4.addEntity(entity4); te4.addEntity(entity4);
TimelineEntities te5 = new TimelineEntities();
TimelineEntity entity5 = new TimelineEntity();
entity5.setId("entity1");
entity5.setType("type1");
entity5.setCreatedTime(1425016501034L);
te5.addEntity(entity5);
TimelineEntity entity6 = new TimelineEntity();
entity6.setId("entity2");
entity6.setType("type1");
entity6.setCreatedTime(1425016501034L);
te5.addEntity(entity6);
HBaseTimelineWriterImpl hbi = null; HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration(); Configuration c1 = util.getConfiguration();
try { try {
@ -209,6 +221,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4); hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4);
hbi.write(cluster, user, flow2, hbi.write(cluster, user, flow2,
flowVersion2, runid2, entity3.getId(), te3); flowVersion2, runid2, entity3.getId(), te3);
hbi.write(cluster, user, flow, flowVersion, runid,
"application_1111111111_1111", te5);
hbi.flush(); hbi.flush();
} finally { } finally {
hbi.close(); hbi.close();
@ -333,7 +347,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
Client client = createClient(); Client client = createClient();
try { try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowrun/cluster1/flow_name/1002345678919?userid=user1"); "timeline/flowrun/user1/cluster1/flow_name/1002345678919");
ClientResponse resp = getResponse(client, uri); ClientResponse resp = getResponse(client, uri);
FlowRunEntity entity = resp.getEntity(FlowRunEntity.class); FlowRunEntity entity = resp.getEntity(FlowRunEntity.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@ -350,7 +364,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
// Query without specifying cluster ID. // Query without specifying cluster ID.
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowrun/flow_name/1002345678919?userid=user1"); "timeline/flowrun/user1/flow_name/1002345678919");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entity = resp.getEntity(FlowRunEntity.class); entity = resp.getEntity(FlowRunEntity.class);
assertNotNull(entity); assertNotNull(entity);
@ -374,7 +388,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
Client client = createClient(); Client client = createClient();
try { try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowruns/cluster1/flow_name?userid=user1"); "timeline/flowruns/user1/cluster1/flow_name");
ClientResponse resp = getResponse(client, uri); ClientResponse resp = getResponse(client, uri);
Set<FlowRunEntity> entities = Set<FlowRunEntity> entities =
resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
@ -393,7 +407,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
} }
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowruns/cluster1/flow_name?userid=user1&limit=1"); "timeline/flowruns/user1/cluster1/flow_name?limit=1");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@ -408,7 +422,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
} }
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowruns/cluster1/flow_name?userid=user1&" + "timeline/flowruns/user1/cluster1/flow_name?" +
"createdtimestart=1425016501030"); "createdtimestart=1425016501030");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
@ -424,7 +438,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
} }
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowruns/cluster1/flow_name?userid=user1&" + "timeline/flowruns/user1/cluster1/flow_name?" +
"createdtimestart=1425016500999&createdtimeend=1425016501035"); "createdtimestart=1425016500999&createdtimeend=1425016501035");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
@ -443,7 +457,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
} }
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowruns/cluster1/flow_name?userid=user1&" + "timeline/flowruns/user1/cluster1/flow_name?" +
"createdtimeend=1425016501030"); "createdtimeend=1425016501030");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
@ -459,7 +473,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
} }
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowruns/cluster1/flow_name?userid=user1&fields=metrics"); "timeline/flowruns/user1/cluster1/flow_name?fields=metrics");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@ -620,7 +634,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
try { try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/app/cluster1/application_1111111111_1111?" + "timeline/app/cluster1/application_1111111111_1111?" +
"userid=user1&fields=ALL"); "fields=ALL");
ClientResponse resp = getResponse(client, uri); ClientResponse resp = getResponse(client, uri);
TimelineEntity entity = resp.getEntity(TimelineEntity.class); TimelineEntity entity = resp.getEntity(TimelineEntity.class);
assertNotNull(entity); assertNotNull(entity);
@ -640,13 +654,49 @@ public class TestTimelineReaderWebServicesHBaseStorage {
} }
} }
@Test
public void testGetEntityWithoutFlowInfo() throws Exception {
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/entity/cluster1/application_1111111111_1111/type1/entity1");
ClientResponse resp = getResponse(client, uri);
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
assertNotNull(entity);
assertEquals("entity1", entity.getId());
assertEquals("type1", entity.getType());
} finally {
client.destroy();
}
}
@Test
public void testGetEntitiesWithoutFlowInfo() throws Exception {
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/entities/cluster1/application_1111111111_1111/type1");
ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (TimelineEntity entity : entities) {
assertTrue(entity.getId().equals("entity1") ||
entity.getId().equals("entity2"));
}
} finally {
client.destroy();
}
}
@Test @Test
public void testGetFlowRunApps() throws Exception { public void testGetFlowRunApps() throws Exception {
Client client = createClient(); Client client = createClient();
try { try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowrunapps/cluster1/flow_name/1002345678919?" + "timeline/flowrunapps/user1/cluster1/flow_name/1002345678919?" +
"userid=user1&fields=ALL"); "fields=ALL");
ClientResponse resp = getResponse(client, uri); ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities = Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@ -662,14 +712,14 @@ public class TestTimelineReaderWebServicesHBaseStorage {
// Query without specifying cluster ID. // Query without specifying cluster ID.
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowrunapps/flow_name/1002345678919?userid=user1"); "timeline/flowrunapps/user1/flow_name/1002345678919");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities); assertNotNull(entities);
assertEquals(2, entities.size()); assertEquals(2, entities.size());
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowrunapps/flow_name/1002345678919?userid=user1&limit=1"); "timeline/flowrunapps/user1/flow_name/1002345678919?limit=1");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities); assertNotNull(entities);
@ -684,7 +734,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
Client client = createClient(); Client client = createClient();
try { try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowapps/cluster1/flow_name?userid=user1&fields=ALL"); "timeline/flowapps/user1/cluster1/flow_name?fields=ALL");
ClientResponse resp = getResponse(client, uri); ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities = Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@ -702,14 +752,14 @@ public class TestTimelineReaderWebServicesHBaseStorage {
// Query without specifying cluster ID. // Query without specifying cluster ID.
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowapps/flow_name?userid=user1"); "timeline/flowapps/user1/flow_name");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities); assertNotNull(entities);
assertEquals(3, entities.size()); assertEquals(3, entities.size());
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowapps/flow_name?userid=user1&limit=1"); "timeline/flowapps/user1/flow_name?limit=1");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertNotNull(entities); assertNotNull(entities);
@ -725,7 +775,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
try { try {
String entityType = TimelineEntityType.YARN_APPLICATION.toString(); String entityType = TimelineEntityType.YARN_APPLICATION.toString();
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowapps/cluster1/flow_name?userid=user1&eventfilters=" + "timeline/flowapps/user1/cluster1/flow_name?eventfilters=" +
ApplicationMetricsConstants.FINISHED_EVENT_TYPE); ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
ClientResponse resp = getResponse(client, uri); ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities = Set<TimelineEntity> entities =
@ -736,7 +786,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
newEntity(entityType, "application_1111111111_1111"))); newEntity(entityType, "application_1111111111_1111")));
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowapps/cluster1/flow_name?userid=user1&metricfilters=" + "timeline/flowapps/user1/cluster1/flow_name?metricfilters=" +
"HDFS_BYTES_READ"); "HDFS_BYTES_READ");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@ -746,7 +796,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
newEntity(entityType, "application_1111111111_1111"))); newEntity(entityType, "application_1111111111_1111")));
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowapps/cluster1/flow_name?userid=user1&conffilters=" + "timeline/flowapps/user1/cluster1/flow_name?conffilters=" +
"cfg1:value1"); "cfg1:value1");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@ -764,7 +814,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
Client client = createClient(); Client client = createClient();
try { try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowrun/cluster1/flow_name/1002345678929?userid=user1"); "timeline/flowrun/user1/cluster1/flow_name/1002345678929");
verifyHttpResponse(client, uri, Status.NOT_FOUND); verifyHttpResponse(client, uri, Status.NOT_FOUND);
} finally { } finally {
client.destroy(); client.destroy();
@ -793,8 +843,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
Client client = createClient(); Client client = createClient();
try { try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/app/cluster1/flow_name/1002345678919/" + "timeline/app/user1/cluster1/flow_name/1002345678919/" +
"application_1111111111_1378?userid=user1"); "application_1111111111_1378");
verifyHttpResponse(client, uri, Status.NOT_FOUND); verifyHttpResponse(client, uri, Status.NOT_FOUND);
} finally { } finally {
client.destroy(); client.destroy();
@ -806,7 +856,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
Client client = createClient(); Client client = createClient();
try { try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowrunapps/cluster2/flow_name/1002345678919"); "timeline/flowrunapps/user1/cluster2/flow_name/1002345678919");
ClientResponse resp = getResponse(client, uri); ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities = Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@ -823,7 +873,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
Client client = createClient(); Client client = createClient();
try { try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowapps/cluster2/flow_name55"); "timeline/flowapps/user1/cluster2/flow_name55");
ClientResponse resp = getResponse(client, uri); ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities = Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); resp.getEntity(new GenericType<Set<TimelineEntity>>(){});