YARN-3864. Implement support for querying single app and all apps for a flow run (Varun Saxena via sjlee)
This commit is contained in:
parent
708fa8b1ae
commit
0f44b5508d
@ -286,7 +286,7 @@ public Set<TimelineEntity> getEntities(
|
||||
@QueryParam("eventfilters") String eventfilters,
|
||||
@QueryParam("fields") String fields) {
|
||||
String url = req.getRequestURI() +
|
||||
(null == req.getQueryString() ? "" :
|
||||
(req.getQueryString() == null ? "" :
|
||||
QUERY_STRING_SEP + req.getQueryString());
|
||||
UserGroupInformation callerUGI = getUser(req);
|
||||
LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
|
||||
@ -310,7 +310,7 @@ public Set<TimelineEntity> getEntities(
|
||||
parseFieldsStr(fields, COMMA_DELIMITER));
|
||||
} catch (Exception e) {
|
||||
handleException(e, url, startTime,
|
||||
"createdTime or modifiedTime start/end or limit or flowId");
|
||||
"createdTime or modifiedTime start/end or limit or flowrunid");
|
||||
}
|
||||
long endTime = Time.monotonicNow();
|
||||
if (entities == null) {
|
||||
@ -360,7 +360,7 @@ public TimelineEntity getEntity(
|
||||
@QueryParam("flowrunid") String flowRunId,
|
||||
@QueryParam("fields") String fields) {
|
||||
String url = req.getRequestURI() +
|
||||
(null == req.getQueryString() ? "" :
|
||||
(req.getQueryString() == null ? "" :
|
||||
QUERY_STRING_SEP + req.getQueryString());
|
||||
UserGroupInformation callerUGI = getUser(req);
|
||||
LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
|
||||
@ -420,7 +420,7 @@ public TimelineEntity getFlowRun(
|
||||
@QueryParam("userid") String userId,
|
||||
@QueryParam("fields") String fields) {
|
||||
String url = req.getRequestURI() +
|
||||
(null == req.getQueryString() ? "" :
|
||||
(req.getQueryString() == null ? "" :
|
||||
QUERY_STRING_SEP + req.getQueryString());
|
||||
UserGroupInformation callerUGI = getUser(req);
|
||||
LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
|
||||
@ -477,7 +477,7 @@ public Set<TimelineEntity> getFlows(
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("fields") String fields) {
|
||||
String url = req.getRequestURI() +
|
||||
(null == req.getQueryString() ? "" :
|
||||
(req.getQueryString() == null ? "" :
|
||||
QUERY_STRING_SEP + req.getQueryString());
|
||||
UserGroupInformation callerUGI = getUser(req);
|
||||
LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
|
||||
@ -502,4 +502,200 @@ null, parseStr(clusterId), null, null, null,
|
||||
" (Took " + (endTime - startTime) + " ms.)");
|
||||
return entities;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a single app for given app id. Cluster ID is not provided by
|
||||
* client so default cluster ID has to be taken.
|
||||
*/
|
||||
@GET
|
||||
@Path("/app/{appid}/")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public TimelineEntity getApp(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("appid") String appId,
|
||||
@QueryParam("flowid") String flowId,
|
||||
@QueryParam("flowrunid") String flowRunId,
|
||||
@QueryParam("userid") String userId,
|
||||
@QueryParam("fields") String fields) {
|
||||
return getApp(req, res, null, appId, flowId, flowRunId, userId, fields);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a single app for given cluster id and app id.
|
||||
*/
|
||||
@GET
|
||||
@Path("/app/{clusterid}/{appid}/")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public TimelineEntity getApp(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("clusterid") String clusterId,
|
||||
@PathParam("appid") String appId,
|
||||
@QueryParam("flowid") String flowId,
|
||||
@QueryParam("flowrunid") String flowRunId,
|
||||
@QueryParam("userid") String userId,
|
||||
@QueryParam("fields") String fields) {
|
||||
String url = req.getRequestURI() +
|
||||
(req.getQueryString() == null ? "" :
|
||||
QUERY_STRING_SEP + req.getQueryString());
|
||||
UserGroupInformation callerUGI = getUser(req);
|
||||
LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
|
||||
long startTime = Time.monotonicNow();
|
||||
init(res);
|
||||
TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
|
||||
TimelineEntity entity = null;
|
||||
try {
|
||||
entity = timelineReaderManager.getEntity(
|
||||
parseUser(callerUGI, userId), parseStr(clusterId),
|
||||
parseStr(flowId), parseLongStr(flowRunId), parseStr(appId),
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), null,
|
||||
parseFieldsStr(fields, COMMA_DELIMITER));
|
||||
} catch (Exception e) {
|
||||
handleException(e, url, startTime, "flowrunid");
|
||||
}
|
||||
long endTime = Time.monotonicNow();
|
||||
if (entity == null) {
|
||||
LOG.info("Processed URL " + url + " but app not found" + " (Took " +
|
||||
(endTime - startTime) + " ms.)");
|
||||
throw new NotFoundException("App " + appId + " not found");
|
||||
}
|
||||
LOG.info("Processed URL " + url +
|
||||
" (Took " + (endTime - startTime) + " ms.)");
|
||||
return entity;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a list of apps for given flow id and flow run id. Cluster ID is not
|
||||
* provided by client so default cluster ID has to be taken. If number of
|
||||
* matching apps are more than the limit, most recent apps till the limit is
|
||||
* reached, will be returned.
|
||||
*/
|
||||
@GET
|
||||
@Path("/flowrunapps/{flowid}/{flowrunid}/")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Set<TimelineEntity> getFlowRunApps(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("flowid") String flowId,
|
||||
@PathParam("flowrunid") String flowRunId,
|
||||
@QueryParam("userid") String userId,
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@QueryParam("createdtimeend") String createdTimeEnd,
|
||||
@QueryParam("modifiedtimestart") String modifiedTimeStart,
|
||||
@QueryParam("modifiedtimeend") String modifiedTimeEnd,
|
||||
@QueryParam("relatesto") String relatesTo,
|
||||
@QueryParam("isrelatedto") String isRelatedTo,
|
||||
@QueryParam("infofilters") String infofilters,
|
||||
@QueryParam("conffilters") String conffilters,
|
||||
@QueryParam("metricfilters") String metricfilters,
|
||||
@QueryParam("eventfilters") String eventfilters,
|
||||
@QueryParam("fields") String fields) {
|
||||
return getEntities(req, res, null, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId,
|
||||
flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
|
||||
metricfilters, eventfilters, fields);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a list of apps for a given cluster id, flow id and flow run id. If
|
||||
* number of matching apps are more than the limit, most recent apps till the
|
||||
* limit is reached, will be returned.
|
||||
*/
|
||||
@GET
|
||||
@Path("/flowrunapps/{clusterid}/{flowid}/{flowrunid}/")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Set<TimelineEntity> getFlowRunApps(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("clusterid") String clusterId,
|
||||
@PathParam("flowid") String flowId,
|
||||
@PathParam("flowrunid") String flowRunId,
|
||||
@QueryParam("userid") String userId,
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@QueryParam("createdtimeend") String createdTimeEnd,
|
||||
@QueryParam("modifiedtimestart") String modifiedTimeStart,
|
||||
@QueryParam("modifiedtimeend") String modifiedTimeEnd,
|
||||
@QueryParam("relatesto") String relatesTo,
|
||||
@QueryParam("isrelatedto") String isRelatedTo,
|
||||
@QueryParam("infofilters") String infofilters,
|
||||
@QueryParam("conffilters") String conffilters,
|
||||
@QueryParam("metricfilters") String metricfilters,
|
||||
@QueryParam("eventfilters") String eventfilters,
|
||||
@QueryParam("fields") String fields) {
|
||||
return getEntities(req, res, clusterId, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId,
|
||||
flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
|
||||
metricfilters, eventfilters, fields);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a list of apps for given flow id. Cluster ID is not provided by
|
||||
* client so default cluster ID has to be taken. If number of matching apps
|
||||
* are more than the limit, most recent apps till the limit is reached, will
|
||||
* be returned.
|
||||
*/
|
||||
@GET
|
||||
@Path("/flowapps/{flowid}/")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Set<TimelineEntity> getFlowApps(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("flowid") String flowId,
|
||||
@QueryParam("userid") String userId,
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@QueryParam("createdtimeend") String createdTimeEnd,
|
||||
@QueryParam("modifiedtimestart") String modifiedTimeStart,
|
||||
@QueryParam("modifiedtimeend") String modifiedTimeEnd,
|
||||
@QueryParam("relatesto") String relatesTo,
|
||||
@QueryParam("isrelatedto") String isRelatedTo,
|
||||
@QueryParam("infofilters") String infofilters,
|
||||
@QueryParam("conffilters") String conffilters,
|
||||
@QueryParam("metricfilters") String metricfilters,
|
||||
@QueryParam("eventfilters") String eventfilters,
|
||||
@QueryParam("fields") String fields) {
|
||||
return getEntities(req, res, null, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId,
|
||||
null, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
|
||||
metricfilters, eventfilters, fields);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a list of apps for a given cluster id and flow id. If number of
|
||||
* matching apps are more than the limit, most recent apps till the limit is
|
||||
* reached, will be returned.
|
||||
*/
|
||||
@GET
|
||||
@Path("/flowapps/{clusterid}/{flowid}/")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Set<TimelineEntity> getFlowApps(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("clusterid") String clusterId,
|
||||
@PathParam("flowid") String flowId,
|
||||
@QueryParam("userid") String userId,
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("createdtimestart") String createdTimeStart,
|
||||
@QueryParam("createdtimeend") String createdTimeEnd,
|
||||
@QueryParam("modifiedtimestart") String modifiedTimeStart,
|
||||
@QueryParam("modifiedtimeend") String modifiedTimeEnd,
|
||||
@QueryParam("relatesto") String relatesTo,
|
||||
@QueryParam("isrelatedto") String isRelatedTo,
|
||||
@QueryParam("infofilters") String infofilters,
|
||||
@QueryParam("conffilters") String conffilters,
|
||||
@QueryParam("metricfilters") String metricfilters,
|
||||
@QueryParam("eventfilters") String eventfilters,
|
||||
@QueryParam("fields") String fields) {
|
||||
return getEntities(req, res, clusterId, null,
|
||||
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId,
|
||||
null, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
|
||||
modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
|
||||
metricfilters, eventfilters, fields);
|
||||
}
|
||||
}
|
@ -18,7 +18,6 @@
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -28,6 +27,8 @@
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.PageFilter;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
||||
@ -38,6 +39,8 @@
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Timeline entity reader for application entities that are stored in the
|
||||
* application table.
|
||||
@ -57,7 +60,7 @@ public ApplicationEntityReader(String userId, String clusterId,
|
||||
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
|
||||
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
|
||||
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
|
||||
eventFilters, fieldsToRetrieve);
|
||||
eventFilters, fieldsToRetrieve, true);
|
||||
}
|
||||
|
||||
public ApplicationEntityReader(String userId, String clusterId,
|
||||
@ -85,11 +88,64 @@ protected Result getResult(Configuration hbaseConf, Connection conn)
|
||||
return table.getResult(hbaseConf, conn, get);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void validateParams() {
|
||||
Preconditions.checkNotNull(userId, "userId shouldn't be null");
|
||||
Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
|
||||
Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
|
||||
if (singleEntityRead) {
|
||||
Preconditions.checkNotNull(appId, "appId shouldn't be null");
|
||||
} else {
|
||||
Preconditions.checkNotNull(flowId, "flowId shouldn't be null");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void augmentParams(Configuration hbaseConf, Connection conn)
|
||||
throws IOException {
|
||||
if (singleEntityRead) {
|
||||
if (flowId == null || flowRunId == null) {
|
||||
FlowContext context =
|
||||
lookupFlowContext(clusterId, appId, hbaseConf, conn);
|
||||
flowId = context.flowId;
|
||||
flowRunId = context.flowRunId;
|
||||
}
|
||||
}
|
||||
if (fieldsToRetrieve == null) {
|
||||
fieldsToRetrieve = EnumSet.noneOf(Field.class);
|
||||
}
|
||||
if (!singleEntityRead) {
|
||||
if (limit == null || limit < 0) {
|
||||
limit = TimelineReader.DEFAULT_LIMIT;
|
||||
}
|
||||
if (createdTimeBegin == null) {
|
||||
createdTimeBegin = DEFAULT_BEGIN_TIME;
|
||||
}
|
||||
if (createdTimeEnd == null) {
|
||||
createdTimeEnd = DEFAULT_END_TIME;
|
||||
}
|
||||
if (modifiedTimeBegin == null) {
|
||||
modifiedTimeBegin = DEFAULT_BEGIN_TIME;
|
||||
}
|
||||
if (modifiedTimeEnd == null) {
|
||||
modifiedTimeEnd = DEFAULT_END_TIME;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ResultScanner getResults(Configuration hbaseConf,
|
||||
Connection conn) throws IOException {
|
||||
throw new UnsupportedOperationException(
|
||||
"we don't support multiple apps query");
|
||||
Scan scan = new Scan();
|
||||
if (flowRunId != null) {
|
||||
scan.setRowPrefixFilter(ApplicationRowKey.
|
||||
getRowKeyPrefix(clusterId, userId, flowId, flowRunId));
|
||||
} else {
|
||||
scan.setRowPrefixFilter(ApplicationRowKey.
|
||||
getRowKeyPrefix(clusterId, userId, flowId));
|
||||
}
|
||||
scan.setFilter(new PageFilter(limit));
|
||||
return table.getResultScanner(hbaseConf, conn, scan);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -20,9 +20,7 @@
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
@ -60,7 +58,7 @@ public FlowActivityEntityReader(String userId, String clusterId,
|
||||
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
|
||||
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
|
||||
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
|
||||
eventFilters, fieldsToRetrieve);
|
||||
eventFilters, fieldsToRetrieve, true);
|
||||
}
|
||||
|
||||
public FlowActivityEntityReader(String userId, String clusterId,
|
||||
@ -78,35 +76,6 @@ protected BaseTable<?> getTable() {
|
||||
return FLOW_ACTIVITY_TABLE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Since this is strictly sorted by the row key, it is sufficient to collect
|
||||
* the first results as specified by the limit.
|
||||
*/
|
||||
@Override
|
||||
public Set<TimelineEntity> readEntities(Configuration hbaseConf,
|
||||
Connection conn) throws IOException {
|
||||
validateParams();
|
||||
augmentParams(hbaseConf, conn);
|
||||
|
||||
NavigableSet<TimelineEntity> entities = new TreeSet<>();
|
||||
ResultScanner results = getResults(hbaseConf, conn);
|
||||
try {
|
||||
for (Result result : results) {
|
||||
TimelineEntity entity = parseEntity(result);
|
||||
if (entity == null) {
|
||||
continue;
|
||||
}
|
||||
entities.add(entity);
|
||||
if (entities.size() == limit) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return entities;
|
||||
} finally {
|
||||
results.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void validateParams() {
|
||||
Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
|
||||
|
@ -56,7 +56,7 @@ public FlowRunEntityReader(String userId, String clusterId,
|
||||
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
|
||||
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
|
||||
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
|
||||
eventFilters, fieldsToRetrieve);
|
||||
eventFilters, fieldsToRetrieve, false);
|
||||
}
|
||||
|
||||
public FlowRunEntityReader(String userId, String clusterId,
|
||||
|
@ -61,8 +61,8 @@ class GenericEntityReader extends TimelineEntityReader {
|
||||
private static final EntityTable ENTITY_TABLE = new EntityTable();
|
||||
private static final Log LOG = LogFactory.getLog(GenericEntityReader.class);
|
||||
|
||||
private static final long DEFAULT_BEGIN_TIME = 0L;
|
||||
private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
|
||||
protected static final long DEFAULT_BEGIN_TIME = 0L;
|
||||
protected static final long DEFAULT_END_TIME = Long.MAX_VALUE;
|
||||
|
||||
/**
|
||||
* Used to look up the flow context.
|
||||
@ -76,11 +76,11 @@ public GenericEntityReader(String userId, String clusterId,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
EnumSet<Field> fieldsToRetrieve) {
|
||||
EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
|
||||
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
|
||||
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
|
||||
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
|
||||
eventFilters, fieldsToRetrieve);
|
||||
eventFilters, fieldsToRetrieve, sortedKeys);
|
||||
}
|
||||
|
||||
public GenericEntityReader(String userId, String clusterId,
|
||||
@ -97,7 +97,7 @@ protected BaseTable<?> getTable() {
|
||||
return ENTITY_TABLE;
|
||||
}
|
||||
|
||||
private FlowContext lookupFlowContext(String clusterId, String appId,
|
||||
protected FlowContext lookupFlowContext(String clusterId, String appId,
|
||||
Configuration hbaseConf, Connection conn) throws IOException {
|
||||
byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
|
||||
Get get = new Get(rowKey);
|
||||
@ -113,9 +113,9 @@ private FlowContext lookupFlowContext(String clusterId, String appId,
|
||||
}
|
||||
}
|
||||
|
||||
private static class FlowContext {
|
||||
private final String flowId;
|
||||
private final Long flowRunId;
|
||||
protected static class FlowContext {
|
||||
protected final String flowId;
|
||||
protected final Long flowRunId;
|
||||
public FlowContext(String flowId, Long flowRunId) {
|
||||
this.flowId = flowId;
|
||||
this.flowRunId = flowRunId;
|
||||
|
@ -73,6 +73,14 @@ abstract class TimelineEntityReader {
|
||||
*/
|
||||
protected BaseTable<?> table;
|
||||
|
||||
/**
|
||||
* Specifies whether keys for this table are sorted in a manner where entities
|
||||
* can be retrieved by created time. If true, it will be sufficient to collect
|
||||
* the first results as specified by the limit. Otherwise all matched entities
|
||||
* will be fetched and then limit applied.
|
||||
*/
|
||||
private boolean sortedKeys = false;
|
||||
|
||||
/**
|
||||
* Instantiates a reader for multiple-entity reads.
|
||||
*/
|
||||
@ -83,8 +91,9 @@ protected TimelineEntityReader(String userId, String clusterId,
|
||||
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
|
||||
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||
Set<String> metricFilters, Set<String> eventFilters,
|
||||
EnumSet<Field> fieldsToRetrieve) {
|
||||
EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
|
||||
this.singleEntityRead = false;
|
||||
this.sortedKeys = sortedKeys;
|
||||
this.userId = userId;
|
||||
this.clusterId = clusterId;
|
||||
this.flowId = flowId;
|
||||
@ -162,8 +171,14 @@ public Set<TimelineEntity> readEntities(Configuration hbaseConf,
|
||||
continue;
|
||||
}
|
||||
entities.add(entity);
|
||||
if (entities.size() > limit) {
|
||||
entities.pollLast();
|
||||
if (!sortedKeys) {
|
||||
if (entities.size() > limit) {
|
||||
entities.pollLast();
|
||||
}
|
||||
} else {
|
||||
if (entities.size() == limit) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return entities;
|
||||
|
@ -91,7 +91,7 @@ public static TimelineEntityReader createMultipleEntitiesReader(String userId,
|
||||
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
|
||||
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
|
||||
infoFilters, configFilters, metricFilters, eventFilters,
|
||||
fieldsToRetrieve);
|
||||
fieldsToRetrieve, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -60,6 +60,40 @@ public String getAppId() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a row key prefix for the application table as follows:
|
||||
* {@code clusterId!userName!flowId!}
|
||||
*
|
||||
* @param clusterId
|
||||
* @param userId
|
||||
* @param flowId
|
||||
* @return byte array with the row key prefix
|
||||
*/
|
||||
public static byte[] getRowKeyPrefix(String clusterId, String userId,
|
||||
String flowId) {
|
||||
byte[] first = Bytes.toBytes(
|
||||
Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowId));
|
||||
return Separator.QUALIFIERS.join(first, new byte[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a row key prefix for the application table as follows:
|
||||
* {@code clusterId!userName!flowId!flowRunId!}
|
||||
*
|
||||
* @param clusterId
|
||||
* @param userId
|
||||
* @param flowId
|
||||
* @param flowRunId
|
||||
* @return byte array with the row key prefix
|
||||
*/
|
||||
public static byte[] getRowKeyPrefix(String clusterId, String userId,
|
||||
String flowId, Long flowRunId) {
|
||||
byte[] first = Bytes.toBytes(
|
||||
Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowId));
|
||||
byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
|
||||
return Separator.QUALIFIERS.join(first, second, new byte[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a row key for the application table as follows:
|
||||
* {@code clusterId!userName!flowId!flowRunId!AppId}
|
||||
|
@ -65,7 +65,7 @@
|
||||
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
|
||||
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
|
||||
|
||||
public class TestTimelineReaderWebServicesFlowRun {
|
||||
public class TestTimelineReaderWebServicesHBaseStorage {
|
||||
private int serverPort;
|
||||
private TimelineReaderServer server;
|
||||
private static HBaseTestingUtility util;
|
||||
@ -91,12 +91,13 @@ private static void loadData() throws Exception {
|
||||
|
||||
TimelineEntities te = new TimelineEntities();
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
String id = "flowRunMetrics_test";
|
||||
String id = "application_1111111111_1111";
|
||||
String type = TimelineEntityType.YARN_APPLICATION.toString();
|
||||
entity.setId(id);
|
||||
entity.setType(type);
|
||||
Long cTime = 1425016501000L;
|
||||
entity.setCreatedTime(cTime);
|
||||
entity.addConfig("cfg2", "value1");
|
||||
|
||||
// add metrics
|
||||
Set<TimelineMetric> metrics = new HashSet<>();
|
||||
@ -127,17 +128,24 @@ private static void loadData() throws Exception {
|
||||
Object expVal = "test";
|
||||
event.addInfo(expKey, expVal);
|
||||
entity.addEvent(event);
|
||||
TimelineEvent event11 = new TimelineEvent();
|
||||
event11.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
|
||||
expTs = 1436512802010L;
|
||||
event11.setTimestamp(expTs);
|
||||
entity.addEvent(event11);
|
||||
|
||||
te.addEntity(entity);
|
||||
|
||||
// write another application with same metric to this flow
|
||||
TimelineEntities te1 = new TimelineEntities();
|
||||
TimelineEntity entity1 = new TimelineEntity();
|
||||
id = "flowRunMetrics_test";
|
||||
id = "application_1111111111_2222";
|
||||
type = TimelineEntityType.YARN_APPLICATION.toString();
|
||||
entity1.setId(id);
|
||||
entity1.setType(type);
|
||||
cTime = 1425016501000L;
|
||||
entity1.setCreatedTime(cTime);
|
||||
entity1.addConfig("cfg1", "value1");
|
||||
// add metrics
|
||||
metrics.clear();
|
||||
TimelineMetric m2 = new TimelineMetric();
|
||||
@ -149,6 +157,11 @@ private static void loadData() throws Exception {
|
||||
m2.setValues(metricValues);
|
||||
metrics.add(m2);
|
||||
entity1.addMetrics(metrics);
|
||||
TimelineEvent event1 = new TimelineEvent();
|
||||
event1.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
|
||||
event1.setTimestamp(expTs);
|
||||
event1.addInfo(expKey, expVal);
|
||||
entity1.addEvent(event1);
|
||||
te1.addEntity(entity1);
|
||||
|
||||
String flow2 = "flow_name2";
|
||||
@ -156,7 +169,7 @@ private static void loadData() throws Exception {
|
||||
Long runid2 = 2102356789046L;
|
||||
TimelineEntities te3 = new TimelineEntities();
|
||||
TimelineEntity entity3 = new TimelineEntity();
|
||||
id = "flowRunMetrics_test1";
|
||||
id = "application_11111111111111_2223";
|
||||
entity3.setId(id);
|
||||
entity3.setType(type);
|
||||
cTime = 1425016501030L;
|
||||
@ -168,18 +181,30 @@ private static void loadData() throws Exception {
|
||||
entity3.addEvent(event2);
|
||||
te3.addEntity(entity3);
|
||||
|
||||
TimelineEntities te4 = new TimelineEntities();
|
||||
TimelineEntity entity4 = new TimelineEntity();
|
||||
id = "application_1111111111_2224";
|
||||
entity4.setId(id);
|
||||
entity4.setType(type);
|
||||
cTime = 1425016501034L;
|
||||
entity4.setCreatedTime(cTime);
|
||||
TimelineEvent event4 = new TimelineEvent();
|
||||
event4.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
|
||||
event4.setTimestamp(1436512802037L);
|
||||
event4.addInfo("foo_event", "test");
|
||||
entity4.addEvent(event4);
|
||||
te4.addEntity(entity4);
|
||||
|
||||
HBaseTimelineWriterImpl hbi = null;
|
||||
Configuration c1 = util.getConfiguration();
|
||||
try {
|
||||
hbi = new HBaseTimelineWriterImpl(c1);
|
||||
hbi.init(c1);
|
||||
String appName = "application_11111111111111_1111";
|
||||
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
|
||||
appName = "application_11111111111111_2222";
|
||||
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
|
||||
hbi.write(cluster, user, flow, flowVersion, runid1, appName, te);
|
||||
appName = "application_11111111111111_2223";
|
||||
hbi.write(cluster, user, flow2, flowVersion2, runid2, appName, te3);
|
||||
hbi.write(cluster, user, flow, flowVersion, runid, entity.getId(), te);
|
||||
hbi.write(cluster, user, flow, flowVersion, runid, entity1.getId(), te1);
|
||||
hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4);
|
||||
hbi.write(cluster, user, flow2,
|
||||
flowVersion2, runid2, entity3.getId(), te3);
|
||||
hbi.flush();
|
||||
} finally {
|
||||
hbi.close();
|
||||
@ -248,8 +273,15 @@ public HttpURLConnection getHttpURLConnection(final URL url) throws IOException
|
||||
}
|
||||
}
|
||||
|
||||
private static TimelineMetric newMetric(String id, long ts, Number value) {
|
||||
TimelineMetric metric = new TimelineMetric();
|
||||
private static TimelineEntity newEntity(String type, String id) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
entity.setIdentifier(new TimelineEntity.Identifier(type, id));
|
||||
return entity;
|
||||
}
|
||||
|
||||
private static TimelineMetric newMetric(TimelineMetric.Type type,
|
||||
String id, long ts, Number value) {
|
||||
TimelineMetric metric = new TimelineMetric(type);
|
||||
metric.setId(id);
|
||||
metric.addValue(ts, value);
|
||||
return metric;
|
||||
@ -304,8 +336,10 @@ public void testGetFlowRun() throws Exception {
|
||||
assertNotNull(entity);
|
||||
assertEquals("user1@flow_name/1002345678919", entity.getId());
|
||||
assertEquals(2, entity.getMetrics().size());
|
||||
TimelineMetric m1 = newMetric("HDFS_BYTES_READ", ts - 80000, 57L);
|
||||
TimelineMetric m2 = newMetric("MAP_SLOT_MILLIS", ts - 80000, 141L);
|
||||
TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
|
||||
"HDFS_BYTES_READ", ts - 80000, 57L);
|
||||
TimelineMetric m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
|
||||
"MAP_SLOT_MILLIS", ts - 80000, 141L);
|
||||
for (TimelineMetric metric : entity.getMetrics()) {
|
||||
assertTrue(verifyMetrics(metric, m1, m2));
|
||||
}
|
||||
@ -318,8 +352,10 @@ public void testGetFlowRun() throws Exception {
|
||||
assertNotNull(entity);
|
||||
assertEquals("user1@flow_name/1002345678919", entity.getId());
|
||||
assertEquals(2, entity.getMetrics().size());
|
||||
m1 = newMetric("HDFS_BYTES_READ", ts - 80000, 57L);
|
||||
m2 = newMetric("MAP_SLOT_MILLIS", ts - 80000, 141L);
|
||||
m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
|
||||
"HDFS_BYTES_READ", ts - 80000, 57L);
|
||||
m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
|
||||
"MAP_SLOT_MILLIS", ts - 80000, 141L);
|
||||
for (TimelineMetric metric : entity.getMetrics()) {
|
||||
assertTrue(verifyMetrics(metric, m1, m2));
|
||||
}
|
||||
@ -365,6 +401,192 @@ public void testGetFlows() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetApp() throws Exception {
|
||||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/app/cluster1/application_1111111111_1111?" +
|
||||
"userid=user1&fields=ALL&flowid=flow_name&flowrunid=1002345678919");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
||||
assertNotNull(entity);
|
||||
assertEquals("application_1111111111_1111", entity.getId());
|
||||
assertEquals(2, entity.getMetrics().size());
|
||||
TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES,
|
||||
"HDFS_BYTES_READ", ts - 100000, 31L);
|
||||
m1.addValue(ts - 80000, 57L);
|
||||
TimelineMetric m2 = newMetric(TimelineMetric.Type.TIME_SERIES,
|
||||
"MAP_SLOT_MILLIS", ts - 100000, 2L);
|
||||
m2.addValue(ts - 80000, 40L);
|
||||
for (TimelineMetric metric : entity.getMetrics()) {
|
||||
assertTrue(verifyMetrics(metric, m1, m2));
|
||||
}
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/app/application_1111111111_2222?userid=user1" +
|
||||
"&fields=metrics&flowid=flow_name&flowrunid=1002345678919");
|
||||
resp = getResponse(client, uri);
|
||||
entity = resp.getEntity(TimelineEntity.class);
|
||||
assertNotNull(entity);
|
||||
assertEquals("application_1111111111_2222", entity.getId());
|
||||
assertEquals(1, entity.getMetrics().size());
|
||||
TimelineMetric m3 = newMetric(TimelineMetric.Type.TIME_SERIES,
|
||||
"MAP_SLOT_MILLIS", ts - 100000, 5L);
|
||||
m2.addValue(ts - 80000, 101L);
|
||||
for (TimelineMetric metric : entity.getMetrics()) {
|
||||
assertTrue(verifyMetrics(metric, m3));
|
||||
}
|
||||
} finally {
|
||||
client.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAppWithoutFlowInfo() throws Exception {
|
||||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/app/cluster1/application_1111111111_1111?" +
|
||||
"userid=user1&fields=ALL");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
||||
assertNotNull(entity);
|
||||
assertEquals("application_1111111111_1111", entity.getId());
|
||||
assertEquals(2, entity.getMetrics().size());
|
||||
TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES,
|
||||
"HDFS_BYTES_READ", ts - 100000, 31L);
|
||||
m1.addValue(ts - 80000, 57L);
|
||||
TimelineMetric m2 = newMetric(TimelineMetric.Type.TIME_SERIES,
|
||||
"MAP_SLOT_MILLIS", ts - 100000, 2L);
|
||||
m2.addValue(ts - 80000, 40L);
|
||||
for (TimelineMetric metric : entity.getMetrics()) {
|
||||
assertTrue(verifyMetrics(metric, m1, m2));
|
||||
}
|
||||
} finally {
|
||||
client.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFlowRunApps() throws Exception {
|
||||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowrunapps/cluster1/flow_name/1002345678919?" +
|
||||
"userid=user1&fields=ALL");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
assertEquals(2, entities.size());
|
||||
for (TimelineEntity entity : entities) {
|
||||
assertTrue("Unexpected app in result",
|
||||
(entity.getId().equals("application_1111111111_1111") &&
|
||||
entity.getMetrics().size() == 2) ||
|
||||
(entity.getId().equals("application_1111111111_2222") &&
|
||||
entity.getMetrics().size() == 1));
|
||||
}
|
||||
|
||||
// Query without specifying cluster ID.
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowrunapps/flow_name/1002345678919?userid=user1");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
assertEquals(2, entities.size());
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowrunapps/flow_name/1002345678919?userid=user1&limit=1");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
assertEquals(1, entities.size());
|
||||
} finally {
|
||||
client.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFlowApps() throws Exception {
|
||||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowapps/cluster1/flow_name?userid=user1&fields=ALL");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
assertEquals(3, entities.size());
|
||||
for (TimelineEntity entity : entities) {
|
||||
assertTrue("Unexpected app in result",
|
||||
(entity.getId().equals("application_1111111111_1111") &&
|
||||
entity.getMetrics().size() == 2) ||
|
||||
(entity.getId().equals("application_1111111111_2222") &&
|
||||
entity.getMetrics().size() == 1) ||
|
||||
(entity.getId().equals("application_1111111111_2224") &&
|
||||
entity.getMetrics().size() == 0));
|
||||
}
|
||||
|
||||
// Query without specifying cluster ID.
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowapps/flow_name?userid=user1");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
assertEquals(3, entities.size());
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowapps/flow_name?userid=user1&limit=1");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
assertEquals(1, entities.size());
|
||||
} finally {
|
||||
client.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFlowAppsFilters() throws Exception {
|
||||
Client client = createClient();
|
||||
try {
|
||||
String entityType = TimelineEntityType.YARN_APPLICATION.toString();
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowapps/cluster1/flow_name?userid=user1&eventfilters=" +
|
||||
ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
assertEquals(1, entities.size());
|
||||
assertTrue("Unexpected app in result", entities.contains(
|
||||
newEntity(entityType, "application_1111111111_1111")));
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowapps/cluster1/flow_name?userid=user1&metricfilters=" +
|
||||
"HDFS_BYTES_READ");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
assertEquals(1, entities.size());
|
||||
assertTrue("Unexpected app in result", entities.contains(
|
||||
newEntity(entityType, "application_1111111111_1111")));
|
||||
|
||||
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowapps/cluster1/flow_name?userid=user1&conffilters=" +
|
||||
"cfg1:value1");
|
||||
resp = getResponse(client, uri);
|
||||
entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertNotNull(entities);
|
||||
assertEquals(1, entities.size());
|
||||
assertTrue("Unexpected app in result", entities.contains(
|
||||
newEntity(entityType, "application_1111111111_2222")));
|
||||
} finally {
|
||||
client.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFlowRunNotPresent() throws Exception {
|
||||
Client client = createClient();
|
||||
@ -394,6 +616,53 @@ public void testGetFlowsNotPresent() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAppNotPresent() throws Exception {
|
||||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/app/cluster1/flow_name/1002345678919/" +
|
||||
"application_1111111111_1378?userid=user1");
|
||||
verifyHttpResponse(client, uri, Status.NOT_FOUND);
|
||||
} finally {
|
||||
client.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFlowRunAppsNotPresent() throws Exception {
|
||||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowrunapps/cluster2/flow_name/1002345678919");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
assertNotNull(entities);
|
||||
assertEquals(0, entities.size());
|
||||
} finally {
|
||||
client.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFlowAppsNotPresent() throws Exception {
|
||||
Client client = createClient();
|
||||
try {
|
||||
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||
"timeline/flowapps/cluster2/flow_name55");
|
||||
ClientResponse resp = getResponse(client, uri);
|
||||
Set<TimelineEntity> entities =
|
||||
resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||
assertNotNull(entities);
|
||||
assertEquals(0, entities.size());
|
||||
} finally {
|
||||
client.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void stop() throws Exception {
|
||||
if (server != null) {
|
||||
@ -401,5 +670,4 @@ public void stop() throws Exception {
|
||||
server = null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user