YARN-4075 [reader REST API] implement support for querying for flows and flow runs (Varun Saxena via vrushali)

This commit is contained in:
Vrushali Channapattan 2015-09-25 12:16:38 -07:00 committed by Sangjin Lee
parent 10fa6da7d8
commit d95dc89a02
9 changed files with 687 additions and 56 deletions

View File

@ -154,4 +154,14 @@ public class TimelineMetric {
} }
return true; return true;
} }
@Override
public String toString() {
String str = "{id:" + id + ", type:" + type;
if (!values.isEmpty()) {
str += ", values:" + values;
}
str += "}";
return str;
}
} }

View File

@ -25,9 +25,10 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
@ -42,6 +43,22 @@ public class TimelineReaderManager extends AbstractService {
this.reader = timelineReader; this.reader = timelineReader;
} }
/**
* Gets cluster ID from config yarn.resourcemanager.cluster-id
* if not supplied by client.
* @param clusterId
* @param conf
* @return clusterId
*/
private static String getClusterID(String clusterId, Configuration conf) {
if (clusterId == null || clusterId.isEmpty()) {
return conf.get(
YarnConfiguration.RM_CLUSTER_ID,
YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
}
return clusterId;
}
/** /**
* Get a set of entities matching given predicates. The meaning of each * Get a set of entities matching given predicates. The meaning of each
* argument has been documented with {@link TimelineReader#getEntities}. * argument has been documented with {@link TimelineReader#getEntities}.
@ -56,7 +73,8 @@ public class TimelineReaderManager extends AbstractService {
Map<String, Object> infoFilters, Map<String, String> configFilters, Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters, Set<String> metricFilters, Set<String> eventFilters,
EnumSet<Field> fieldsToRetrieve) throws IOException { EnumSet<Field> fieldsToRetrieve) throws IOException {
return reader.getEntities(userId, clusterId, flowId, flowRunId, appId, String cluster = getClusterID(clusterId, getConfig());
return reader.getEntities(userId, cluster, flowId, flowRunId, appId,
entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
metricFilters, eventFilters, fieldsToRetrieve); metricFilters, eventFilters, fieldsToRetrieve);
@ -71,7 +89,8 @@ public class TimelineReaderManager extends AbstractService {
public TimelineEntity getEntity(String userId, String clusterId, public TimelineEntity getEntity(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType, String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fields) throws IOException { String entityId, EnumSet<Field> fields) throws IOException {
return reader.getEntity(userId, clusterId, flowId, flowRunId, appId, String cluster = getClusterID(clusterId, getConfig());
return reader.getEntity(userId, cluster, flowId, flowRunId, appId,
entityType, entityId, fields); entityType, entityId, fields);
} }
} }

View File

@ -77,6 +77,7 @@ public class TimelineReaderServer extends CompositeService {
TimelineReader readerStore = ReflectionUtils.newInstance(conf.getClass( TimelineReader readerStore = ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
FileSystemTimelineReaderImpl.class, TimelineReader.class), conf); FileSystemTimelineReaderImpl.class, TimelineReader.class), conf);
LOG.info("Using store " + readerStore.getClass().getName());
readerStore.init(conf); readerStore.init(conf);
return readerStore; return readerStore;
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.timelineservice.reader; package org.apache.hadoop.yarn.server.timelineservice.reader;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -45,6 +46,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@ -169,6 +171,11 @@ public class TimelineReaderWebServices {
return str == null ? null : str.trim(); return str == null ? null : str.trim();
} }
private static String parseUser(UserGroupInformation callerUGI, String user) {
return (callerUGI != null && (user == null || user.isEmpty()) ?
callerUGI.getUserName().trim() : parseStr(user));
}
private static UserGroupInformation getUser(HttpServletRequest req) { private static UserGroupInformation getUser(HttpServletRequest req) {
String remoteUser = req.getRemoteUser(); String remoteUser = req.getRemoteUser();
UserGroupInformation callerUGI = null; UserGroupInformation callerUGI = null;
@ -183,6 +190,17 @@ public class TimelineReaderWebServices {
ctxt.getAttribute(TimelineReaderServer.TIMELINE_READER_MANAGER_ATTR); ctxt.getAttribute(TimelineReaderServer.TIMELINE_READER_MANAGER_ATTR);
} }
private static void handleException(Exception e) throws BadRequestException,
WebApplicationException {
if (e instanceof IllegalArgumentException) {
throw new BadRequestException("Requested Invalid Field.");
} else {
LOG.error("Error while processing REST request", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
}
}
/** /**
* Return the description of the timeline reader web services. * Return the description of the timeline reader web services.
*/ */
@ -196,25 +214,58 @@ public class TimelineReaderWebServices {
} }
/** /**
* Return a set of entities that match the given parameters. * Return a set of entities that match the given parameters. Cluster ID is not
* provided by client so default cluster ID has to be taken.
*/ */
@GET @GET
@Path("/entities/{clusterId}/{appId}/{entityType}") @Path("/entities/{appid}/{entitytype}/")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getEntities( public Set<TimelineEntity> getEntities(
@Context HttpServletRequest req, @Context HttpServletRequest req,
@Context HttpServletResponse res, @Context HttpServletResponse res,
@PathParam("clusterId") String clusterId, @PathParam("appid") String appId,
@PathParam("appId") String appId, @PathParam("entitytype") String entityType,
@PathParam("entityType") String entityType, @QueryParam("userid") String userId,
@QueryParam("userId") String userId, @QueryParam("flowid") String flowId,
@QueryParam("flowId") String flowId, @QueryParam("flowrunid") String flowRunId,
@QueryParam("flowRunId") String flowRunId,
@QueryParam("limit") String limit, @QueryParam("limit") String limit,
@QueryParam("createdTimeStart") String createdTimeStart, @QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdTimeEnd") String createdTimeEnd, @QueryParam("createdtimeend") String createdTimeEnd,
@QueryParam("modifiedTimeStart") String modifiedTimeStart, @QueryParam("modifiedtimestart") String modifiedTimeStart,
@QueryParam("modifiedTimeEnd") String modifiedTimeEnd, @QueryParam("modifiedtimeend") String modifiedTimeEnd,
@QueryParam("relatesto") String relatesTo,
@QueryParam("isrelatedto") String isRelatedTo,
@QueryParam("infofilters") String infofilters,
@QueryParam("conffilters") String conffilters,
@QueryParam("metricfilters") String metricfilters,
@QueryParam("eventfilters") String eventfilters,
@QueryParam("fields") String fields) {
return getEntities(req, res, null, appId, entityType, userId, flowId,
flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
metricfilters, eventfilters, fields);
}
/**
* Return a set of entities that match the given parameters.
*/
@GET
@Path("/entities/{clusterid}/{appid}/{entitytype}/")
@Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getEntities(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam("clusterid") String clusterId,
@PathParam("appid") String appId,
@PathParam("entitytype") String entityType,
@QueryParam("userid") String userId,
@QueryParam("flowid") String flowId,
@QueryParam("flowrunid") String flowRunId,
@QueryParam("limit") String limit,
@QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd,
@QueryParam("modifiedtimestart") String modifiedTimeStart,
@QueryParam("modifiedtimeend") String modifiedTimeEnd,
@QueryParam("relatesto") String relatesTo, @QueryParam("relatesto") String relatesTo,
@QueryParam("isrelatedto") String isRelatedTo, @QueryParam("isrelatedto") String isRelatedTo,
@QueryParam("infofilters") String infofilters, @QueryParam("infofilters") String infofilters,
@ -225,11 +276,10 @@ public class TimelineReaderWebServices {
init(res); init(res);
TimelineReaderManager timelineReaderManager = getTimelineReaderManager(); TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
UserGroupInformation callerUGI = getUser(req); UserGroupInformation callerUGI = getUser(req);
Set<TimelineEntity> entities = null;
try { try {
return timelineReaderManager.getEntities( entities = timelineReaderManager.getEntities(
callerUGI != null && (userId == null || userId.isEmpty()) ? parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId),
callerUGI.getUserName().trim() : parseStr(userId),
parseStr(clusterId), parseStr(flowId),
parseLongStr(flowRunId), parseStr(appId), parseStr(entityType), parseLongStr(flowRunId), parseStr(appId), parseStr(entityType),
parseLongStr(limit), parseLongStr(createdTimeStart), parseLongStr(limit), parseLongStr(createdTimeStart),
parseLongStr(createdTimeEnd), parseLongStr(modifiedTimeStart), parseLongStr(createdTimeEnd), parseLongStr(modifiedTimeStart),
@ -245,31 +295,52 @@ public class TimelineReaderWebServices {
throw new BadRequestException( throw new BadRequestException(
"createdTime or modifiedTime start/end or limit or flowId is not" + "createdTime or modifiedTime start/end or limit or flowId is not" +
" a numeric value."); " a numeric value.");
} catch (IllegalArgumentException e) {
throw new BadRequestException("Requested Invalid Field.");
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error getting entities", e); handleException(e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
} }
if (entities == null) {
entities = Collections.emptySet();
}
return entities;
}
/**
* Return a single entity of the given entity type and Id. Cluster ID is not
* provided by client so default cluster ID has to be taken.
*/
@GET
@Path("/entity/{appid}/{entitytype}/{entityid}/")
@Produces(MediaType.APPLICATION_JSON)
public TimelineEntity getEntity(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam("appid") String appId,
@PathParam("entitytype") String entityType,
@PathParam("entityid") String entityId,
@QueryParam("userid") String userId,
@QueryParam("flowid") String flowId,
@QueryParam("flowrunid") String flowRunId,
@QueryParam("fields") String fields) {
return getEntity(req, res, null, appId, entityType, entityId, userId,
flowId, flowRunId, fields);
} }
/** /**
* Return a single entity of the given entity type and Id. * Return a single entity of the given entity type and Id.
*/ */
@GET @GET
@Path("/entity/{clusterId}/{appId}/{entityType}/{entityId}/") @Path("/entity/{clusterid}/{appid}/{entitytype}/{entityid}/")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public TimelineEntity getEntity( public TimelineEntity getEntity(
@Context HttpServletRequest req, @Context HttpServletRequest req,
@Context HttpServletResponse res, @Context HttpServletResponse res,
@PathParam("clusterId") String clusterId, @PathParam("clusterid") String clusterId,
@PathParam("appId") String appId, @PathParam("appid") String appId,
@PathParam("entityType") String entityType, @PathParam("entitytype") String entityType,
@PathParam("entityId") String entityId, @PathParam("entityid") String entityId,
@QueryParam("userId") String userId, @QueryParam("userid") String userId,
@QueryParam("flowId") String flowId, @QueryParam("flowid") String flowId,
@QueryParam("flowRunId") String flowRunId, @QueryParam("flowrunid") String flowRunId,
@QueryParam("fields") String fields) { @QueryParam("fields") String fields) {
init(res); init(res);
TimelineReaderManager timelineReaderManager = getTimelineReaderManager(); TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
@ -277,19 +348,13 @@ public class TimelineReaderWebServices {
TimelineEntity entity = null; TimelineEntity entity = null;
try { try {
entity = timelineReaderManager.getEntity( entity = timelineReaderManager.getEntity(
callerUGI != null && (userId == null || userId.isEmpty()) ? parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId),
callerUGI.getUserName().trim() : parseStr(userId), parseLongStr(flowRunId), parseStr(appId), parseStr(entityType),
parseStr(clusterId), parseStr(flowId), parseLongStr(flowRunId), parseStr(entityId), parseFieldsStr(fields, COMMA_DELIMITER));
parseStr(appId), parseStr(entityType), parseStr(entityId),
parseFieldsStr(fields, COMMA_DELIMITER));
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
throw new BadRequestException("flowRunId is not a numeric value."); throw new BadRequestException("flowrunid is not a numeric value.");
} catch (IllegalArgumentException e) {
throw new BadRequestException("Requested Invalid Field.");
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error getting entity", e); handleException(e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
} }
if (entity == null) { if (entity == null) {
throw new NotFoundException("Timeline entity {id: " + parseStr(entityId) + throw new NotFoundException("Timeline entity {id: " + parseStr(entityId) +
@ -297,4 +362,104 @@ public class TimelineReaderWebServices {
} }
return entity; return entity;
} }
/**
* Return a single flow run for the given cluster, flow id and run id.
* Cluster ID is not provided by client so default cluster ID has to be taken.
*/
@GET
@Path("/flowrun/{flowid}/{flowrunid}/")
@Produces(MediaType.APPLICATION_JSON)
public TimelineEntity getFlowRun(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam("flowid") String flowId,
@PathParam("flowrunid") String flowRunId,
@QueryParam("userid") String userId,
@QueryParam("fields") String fields) {
return getFlowRun(req, res, null, flowId, flowRunId, userId, fields);
}
/**
* Return a single flow run for the given cluster, flow id and run id.
*/
@GET
@Path("/flowrun/{clusterid}/{flowid}/{flowrunid}/")
@Produces(MediaType.APPLICATION_JSON)
public TimelineEntity getFlowRun(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam("clusterid") String clusterId,
@PathParam("flowid") String flowId,
@PathParam("flowrunid") String flowRunId,
@QueryParam("userid") String userId,
@QueryParam("fields") String fields) {
init(res);
TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
UserGroupInformation callerUGI = getUser(req);
TimelineEntity entity = null;
try {
entity = timelineReaderManager.getEntity(
parseUser(callerUGI, userId), parseStr(clusterId),
parseStr(flowId), parseLongStr(flowRunId), null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null,
parseFieldsStr(fields, COMMA_DELIMITER));
} catch (NumberFormatException e) {
throw new BadRequestException("flowRunId is not a numeric value.");
} catch (Exception e) {
handleException(e);
}
if (entity == null) {
throw new NotFoundException("Flow run {flow id: " + parseStr(flowId) +
", run id: " + parseLongStr(flowRunId) + " } is not found");
}
return entity;
}
/**
* Return a list of flows for a given cluster id. Cluster ID is not
* provided by client so default cluster ID has to be taken.
*/
@GET
@Path("/flows/")
@Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getFlows(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@QueryParam("limit") String limit,
@QueryParam("fields") String fields) {
return getFlows(req, res, null, limit, fields);
}
/**
* Return a list of flows for a given cluster id.
*/
@GET
@Path("/flows/{clusterid}/")
@Produces(MediaType.APPLICATION_JSON)
public Set<TimelineEntity> getFlows(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam("clusterid") String clusterId,
@QueryParam("limit") String limit,
@QueryParam("fields") String fields) {
init(res);
TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
Set<TimelineEntity> entities = null;
try {
entities = timelineReaderManager.getEntities(
null, parseStr(clusterId), null, null, null,
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), parseLongStr(limit),
null, null, null, null, null, null, null, null, null, null,
parseFieldsStr(fields, COMMA_DELIMITER));
} catch (NumberFormatException e) {
throw new BadRequestException("limit is not a numeric value.");
} catch (Exception e) {
handleException(e);
}
if (entities == null) {
entities = Collections.emptySet();
}
return entities;
}
} }

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.timelineservice.reader;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
import com.google.common.annotations.VisibleForTesting;
/** /**
* This creates the schema for a hbase based backend for storing application * This creates the schema for a hbase based backend for storing application
* timeline information. * timeline information.
@ -201,6 +203,7 @@ public class TimelineSchemaCreator {
return commandLine; return commandLine;
} }
@VisibleForTesting
public static void createAllTables(Configuration hbaseConf, public static void createAllTables(Configuration hbaseConf,
boolean skipExisting) throws IOException { boolean skipExisting) throws IOException {

View File

@ -187,8 +187,8 @@ public class TestTimelineReaderWebServices {
Client client = createClient(); Client client = createClient();
try { try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/entity/cluster1/app1/app/id_1?userId=user1&" + "timeline/entity/cluster1/app1/app/id_1?userid=user1&" +
"flowId=flow1&flowRunId=1"); "flowid=flow1&flowrunid=1");
ClientResponse resp = getResponse(client, uri); ClientResponse resp = getResponse(client, uri);
TimelineEntity entity = resp.getEntity(TimelineEntity.class); TimelineEntity entity = resp.getEntity(TimelineEntity.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@ -258,6 +258,32 @@ public class TestTimelineReaderWebServices {
} }
} }
@Test
public void testQueryWithoutCluster() throws Exception {
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/entity/app1/app/id_1");
ClientResponse resp = getResponse(client, uri);
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
assertNotNull(entity);
assertEquals("id_1", entity.getId());
assertEquals("app", entity.getType());
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/entities/app1/app");
resp = getResponse(client, uri);
Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
assertNotNull(entities);
assertEquals(4, entities.size());
} finally {
client.destroy();
}
}
@Test @Test
public void testGetEntities() throws Exception { public void testGetEntities() throws Exception {
Client client = createClient(); Client client = createClient();
@ -318,8 +344,8 @@ public class TestTimelineReaderWebServices {
Client client = createClient(); Client client = createClient();
try { try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/entities/cluster1/app1/app?createdTimeStart=1425016502030&" "timeline/entities/cluster1/app1/app?createdtimestart=1425016502030&"
+ "createdTimeEnd=1425016502060"); + "createdtimeend=1425016502060");
ClientResponse resp = getResponse(client, uri); ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities = Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@ -330,7 +356,7 @@ public class TestTimelineReaderWebServices {
entities.contains(newEntity("app", "id_4"))); entities.contains(newEntity("app", "id_4")));
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
"entities/cluster1/app1/app?createdTimeEnd=1425016502010"); "entities/cluster1/app1/app?createdtimeend=1425016502010");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@ -340,7 +366,7 @@ public class TestTimelineReaderWebServices {
entities.contains(newEntity("app", "id_4"))); entities.contains(newEntity("app", "id_4")));
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
"entities/cluster1/app1/app?createdTimeStart=1425016502010"); "entities/cluster1/app1/app?createdtimestart=1425016502010");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@ -358,8 +384,8 @@ public class TestTimelineReaderWebServices {
Client client = createClient(); Client client = createClient();
try { try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/entities/cluster1/app1/app?modifiedTimeStart=1425016502090" "timeline/entities/cluster1/app1/app?modifiedtimestart=1425016502090"
+ "&modifiedTimeEnd=1425016503020"); + "&modifiedtimeend=1425016503020");
ClientResponse resp = getResponse(client, uri); ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities = Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@ -372,7 +398,7 @@ public class TestTimelineReaderWebServices {
entities.contains(newEntity("app", "id_4"))); entities.contains(newEntity("app", "id_4")));
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
"entities/cluster1/app1/app?modifiedTimeEnd=1425016502090"); "entities/cluster1/app1/app?modifiedtimeend=1425016502090");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@ -384,7 +410,7 @@ public class TestTimelineReaderWebServices {
entities.contains(newEntity("app", "id_3"))); entities.contains(newEntity("app", "id_3")));
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
"entities/cluster1/app1/app?modifiedTimeStart=1425016503005"); "entities/cluster1/app1/app?modifiedtimestart=1425016503005");
resp = getResponse(client, uri); resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType()); assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@ -527,7 +553,7 @@ public class TestTimelineReaderWebServices {
"timeline/entities/cluster1/app1/app?metricfilters=metric7&" + "timeline/entities/cluster1/app1/app?metricfilters=metric7&" +
"isrelatedto=type1:tid1_1;tid1_2,type2:tid2_1%60&relatesto=" + "isrelatedto=type1:tid1_1;tid1_2,type2:tid2_1%60&relatesto=" +
"flow:flow1&eventfilters=event_2,event_4&infofilters=info2:3.5" + "flow:flow1&eventfilters=event_2,event_4&infofilters=info2:3.5" +
"&createdTimeStart=1425016502030&createdTimeEnd=1425016502060"); "&createdtimestart=1425016502030&createdtimeend=1425016502060");
ClientResponse resp = getResponse(client, uri); ClientResponse resp = getResponse(client, uri);
Set<TimelineEntity> entities = Set<TimelineEntity> entities =
resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@ -544,11 +570,11 @@ public class TestTimelineReaderWebServices {
Client client = createClient(); Client client = createClient();
try { try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/entities/cluster1/app1/app?flowRunId=a23b"); "timeline/entities/cluster1/app1/app?flowrunid=a23b");
verifyHttpResponse(client, uri, Status.BAD_REQUEST); verifyHttpResponse(client, uri, Status.BAD_REQUEST);
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
"entity/cluster1/app1/app/id_1?flowRunId=2ab15"); "entity/cluster1/app1/app/id_1?flowrunid=2ab15");
verifyHttpResponse(client, uri, Status.BAD_REQUEST); verifyHttpResponse(client, uri, Status.BAD_REQUEST);
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" + uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +

View File

@ -0,0 +1,365 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.reader;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.ws.rs.core.MediaType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
public class TestTimelineReaderWebServicesFlowRun {
private int serverPort;
private TimelineReaderServer server;
private static HBaseTestingUtility util;
private static long ts = System.currentTimeMillis();
@BeforeClass
public static void setup() throws Exception {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
conf.setInt("hfile.format.version", 3);
util.startMiniCluster();
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
loadData();
}
private static void loadData() throws Exception {
String cluster = "cluster1";
String user = "user1";
String flow = "flow_name";
String flowVersion = "CF7022C10F1354";
Long runid = 1002345678919L;
Long runid1 = 1002345678920L;
TimelineEntities te = new TimelineEntities();
TimelineEntity entity = new TimelineEntity();
String id = "flowRunMetrics_test";
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
Long cTime = 1425016501000L;
entity.setCreatedTime(cTime);
// add metrics
Set<TimelineMetric> metrics = new HashSet<>();
TimelineMetric m1 = new TimelineMetric();
m1.setId("MAP_SLOT_MILLIS");
Map<Long, Number> metricValues = new HashMap<Long, Number>();
metricValues.put(ts - 100000, 2);
metricValues.put(ts - 80000, 40);
m1.setType(Type.TIME_SERIES);
m1.setValues(metricValues);
metrics.add(m1);
m1 = new TimelineMetric();
m1.setId("HDFS_BYTES_READ");
metricValues = new HashMap<Long, Number>();
metricValues.put(ts - 100000, 31);
metricValues.put(ts - 80000, 57);
m1.setType(Type.TIME_SERIES);
m1.setValues(metricValues);
metrics.add(m1);
entity.addMetrics(metrics);
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
Long expTs = 1436512802000L;
event.setTimestamp(expTs);
String expKey = "foo_event";
Object expVal = "test";
event.addInfo(expKey, expVal);
entity.addEvent(event);
te.addEntity(entity);
// write another application with same metric to this flow
TimelineEntities te1 = new TimelineEntities();
TimelineEntity entity1 = new TimelineEntity();
id = "flowRunMetrics_test";
type = TimelineEntityType.YARN_APPLICATION.toString();
entity1.setId(id);
entity1.setType(type);
cTime = 1425016501000L;
entity1.setCreatedTime(cTime);
// add metrics
metrics.clear();
TimelineMetric m2 = new TimelineMetric();
m2.setId("MAP_SLOT_MILLIS");
metricValues = new HashMap<Long, Number>();
metricValues.put(ts - 100000, 5L);
metricValues.put(ts - 80000, 101L);
m2.setType(Type.TIME_SERIES);
m2.setValues(metricValues);
metrics.add(m2);
entity1.addMetrics(metrics);
te1.addEntity(entity1);
String flow2 = "flow_name2";
String flowVersion2 = "CF7022C10F1454";
Long runid2 = 2102356789046L;
TimelineEntities te3 = new TimelineEntities();
TimelineEntity entity3 = new TimelineEntity();
id = "flowRunMetrics_test1";
entity3.setId(id);
entity3.setType(type);
cTime = 1425016501030L;
entity3.setCreatedTime(cTime);
TimelineEvent event2 = new TimelineEvent();
event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
event2.setTimestamp(1436512802030L);
event2.addInfo("foo_event", "test");
entity3.addEvent(event2);
te3.addEntity(entity3);
HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
String appName = "application_11111111111111_1111";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
hbi.write(cluster, user, flow, flowVersion, runid1, appName, te);
appName = "application_11111111111111_2223";
hbi.write(cluster, user, flow2, flowVersion2, runid2, appName, te3);
hbi.flush();
} finally {
hbi.close();
}
}
@AfterClass
public static void tearDown() throws Exception {
util.shutdownMiniCluster();
}
@Before
public void init() throws Exception {
try {
Configuration config = util.getConfiguration();
config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
"localhost:0");
config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
"org.apache.hadoop.yarn.server.timelineservice.storage." +
"HBaseTimelineReaderImpl");
config.setInt("hfile.format.version", 3);
server = new TimelineReaderServer();
server.init(config);
server.start();
serverPort = server.getWebServerPort();
} catch (Exception e) {
Assert.fail("Web server failed to start");
}
}
private static Client createClient() {
ClientConfig cfg = new DefaultClientConfig();
cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
return new Client(new URLConnectionClientHandler(
new DummyURLConnectionFactory()), cfg);
}
private static ClientResponse getResponse(Client client, URI uri)
throws Exception {
ClientResponse resp =
client.resource(uri).accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
if (resp == null ||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
String msg = new String();
if (resp != null) {
msg = resp.getClientResponseStatus().toString();
}
throw new IOException("Incorrect response from timeline reader. " +
"Status=" + msg);
}
return resp;
}
private static class DummyURLConnectionFactory
implements HttpURLConnectionFactory {
@Override
public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
try {
return (HttpURLConnection)url.openConnection();
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
}
}
}
private static TimelineMetric newMetric(String id, long ts, Number value) {
TimelineMetric metric = new TimelineMetric();
metric.setId(id);
metric.addValue(ts, value);
return metric;
}
private static boolean verifyMetricValues(Map<Long, Number> m1,
Map<Long, Number> m2) {
for (Map.Entry<Long, Number> entry : m1.entrySet()) {
if (!m2.containsKey(entry.getKey())) {
return false;
}
if (m2.get(entry.getKey()).equals(entry.getValue())) {
return false;
}
}
return true;
}
private static boolean verifyMetrics(
TimelineMetric m, TimelineMetric... metrics) {
for (TimelineMetric metric : metrics) {
if (!metric.equals(m)) {
continue;
}
if (!verifyMetricValues(metric.getValues(), m.getValues())) {
continue;
}
return true;
}
return false;
}
@Test
public void testGetFlowRun() throws Exception {
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowrun/cluster1/flow_name/1002345678919?userid=user1");
ClientResponse resp = getResponse(client, uri);
FlowRunEntity entity = resp.getEntity(FlowRunEntity.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
assertNotNull(entity);
assertEquals("user1@flow_name/1002345678919", entity.getId());
assertEquals(2, entity.getMetrics().size());
TimelineMetric m1 = newMetric("HDFS_BYTES_READ", ts - 80000, 57L);
TimelineMetric m2 = newMetric("MAP_SLOT_MILLIS", ts - 80000, 141L);
for (TimelineMetric metric : entity.getMetrics()) {
assertTrue(verifyMetrics(metric, m1, m2));
}
// Query without specifying cluster ID.
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flowrun/flow_name/1002345678919?userid=user1");
resp = getResponse(client, uri);
entity = resp.getEntity(FlowRunEntity.class);
assertNotNull(entity);
assertEquals("user1@flow_name/1002345678919", entity.getId());
assertEquals(2, entity.getMetrics().size());
m1 = newMetric("HDFS_BYTES_READ", ts - 80000, 57L);
m2 = newMetric("MAP_SLOT_MILLIS", ts - 80000, 141L);
for (TimelineMetric metric : entity.getMetrics()) {
assertTrue(verifyMetrics(metric, m1, m2));
}
} finally {
client.destroy();
}
}
@Test
public void testGetFlows() throws Exception {
Client client = createClient();
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flows/cluster1");
ClientResponse resp = getResponse(client, uri);
Set<FlowActivityEntity> entities =
resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
for (FlowActivityEntity entity : entities) {
assertTrue((entity.getId().endsWith("@flow_name") &&
entity.getFlowRuns().size() == 2) ||
(entity.getId().endsWith("@flow_name2") &&
entity.getFlowRuns().size() == 1));
}
// Query without specifying cluster ID.
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flows/");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertNotNull(entities);
assertEquals(2, entities.size());
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flows/cluster1?limit=1");
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertNotNull(entities);
assertEquals(1, entities.size());
} finally {
client.destroy();
}
}
@After
public void stop() throws Exception {
if (server != null) {
server.stop();
server = null;
}
}
}

View File

@ -0,0 +1,19 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# log4j configuration used during build and unit tests
log4j.rootLogger=info,stdout
log4j.threshold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n