YARN-1838. Enhanced timeline service getEntities API to get entities from a given entity ID or insertion timestamp. Contributed by Billie Rinaldi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1580960 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2d89dcb121
commit
0f1eda6bbf
|
@ -178,6 +178,9 @@ Release 2.4.0 - UNRELEASED
|
|||
use smaps for obtaining used memory information. (Rajesh Balamohan via
|
||||
vinodkv)
|
||||
|
||||
YARN-1838. Enhanced timeline service getEntities API to get entities from a
|
||||
given entity ID or insertion timestamp. (Billie Rinaldi via zjshen)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via
|
||||
|
|
|
@ -192,7 +192,7 @@ public class TestDistributedShell {
|
|||
.getApplicationHistoryServer()
|
||||
.getTimelineStore()
|
||||
.getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(),
|
||||
null, null, null, null, null, null);
|
||||
null, null, null, null, null, null, null, null);
|
||||
Assert.assertNotNull(entitiesAttempts);
|
||||
Assert.assertEquals(1, entitiesAttempts.getEntities().size());
|
||||
Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents()
|
||||
|
@ -203,7 +203,7 @@ public class TestDistributedShell {
|
|||
.getApplicationHistoryServer()
|
||||
.getTimelineStore()
|
||||
.getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
|
||||
null, null, null, null, null);
|
||||
null, null, null, null, null, null, null);
|
||||
Assert.assertNotNull(entities);
|
||||
Assert.assertEquals(2, entities.getEntities().size());
|
||||
Assert.assertEquals(entities.getEntities().get(0).getEntityType()
|
||||
|
|
|
@ -102,11 +102,15 @@ public class GenericObjectMapper {
|
|||
*/
|
||||
public static byte[] writeReverseOrderedLong(long l) {
|
||||
byte[] b = new byte[8];
|
||||
b[0] = (byte)(0x7f ^ ((l >> 56) & 0xff));
|
||||
for (int i = 1; i < 7; i++) {
|
||||
return writeReverseOrderedLong(l, b, 0);
|
||||
}
|
||||
|
||||
public static byte[] writeReverseOrderedLong(long l, byte[] b, int offset) {
|
||||
b[offset] = (byte)(0x7f ^ ((l >> 56) & 0xff));
|
||||
for (int i = offset+1; i < offset+7; i++) {
|
||||
b[i] = (byte)(0xff ^ ((l >> 8*(7-i)) & 0xff));
|
||||
}
|
||||
b[7] = (byte)(0xff ^ (l & 0xff));
|
||||
b[offset+7] = (byte)(0xff ^ (l & 0xff));
|
||||
return b;
|
||||
}
|
||||
|
||||
|
|
|
@ -135,7 +135,7 @@ public class LeveldbTimelineStore extends AbstractService
|
|||
|
||||
private static final byte[] EMPTY_BYTES = new byte[0];
|
||||
|
||||
private Map<EntityIdentifier, Long> startTimeWriteCache;
|
||||
private Map<EntityIdentifier, StartAndInsertTime> startTimeWriteCache;
|
||||
private Map<EntityIdentifier, Long> startTimeReadCache;
|
||||
|
||||
/**
|
||||
|
@ -205,6 +205,16 @@ public class LeveldbTimelineStore extends AbstractService
|
|||
super.serviceStop();
|
||||
}
|
||||
|
||||
private static class StartAndInsertTime {
|
||||
final long startTime;
|
||||
final long insertTime;
|
||||
|
||||
public StartAndInsertTime(long startTime, long insertTime) {
|
||||
this.startTime = startTime;
|
||||
this.insertTime = insertTime;
|
||||
}
|
||||
}
|
||||
|
||||
private class EntityDeletionThread extends Thread {
|
||||
private final long ttl;
|
||||
private final long ttlInterval;
|
||||
|
@ -585,14 +595,14 @@ public class LeveldbTimelineStore extends AbstractService
|
|||
|
||||
@Override
|
||||
public TimelineEntities getEntities(String entityType,
|
||||
Long limit, Long windowStart, Long windowEnd,
|
||||
Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
|
||||
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
|
||||
EnumSet<Field> fields) throws IOException {
|
||||
if (primaryFilter == null) {
|
||||
// if no primary filter is specified, prefix the lookup with
|
||||
// ENTITY_ENTRY_PREFIX
|
||||
return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit,
|
||||
windowStart, windowEnd, secondaryFilters, fields);
|
||||
windowStart, windowEnd, fromId, fromTs, secondaryFilters, fields);
|
||||
} else {
|
||||
// if a primary filter is specified, prefix the lookup with
|
||||
// INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
|
||||
|
@ -602,7 +612,7 @@ public class LeveldbTimelineStore extends AbstractService
|
|||
.add(GenericObjectMapper.write(primaryFilter.getValue()), true)
|
||||
.add(ENTITY_ENTRY_PREFIX).getBytesForLookup();
|
||||
return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
|
||||
secondaryFilters, fields);
|
||||
fromId, fromTs, secondaryFilters, fields);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -614,6 +624,8 @@ public class LeveldbTimelineStore extends AbstractService
|
|||
* @param limit A limit on the number of entities to return
|
||||
* @param starttime The earliest entity start time to retrieve (exclusive)
|
||||
* @param endtime The latest entity start time to retrieve (inclusive)
|
||||
* @param fromId Retrieve entities starting with this entity
|
||||
* @param fromTs Ignore entities with insert timestamp later than this ts
|
||||
* @param secondaryFilters Filter pairs that the entities should match
|
||||
* @param fields The set of fields to retrieve
|
||||
* @return A list of entities
|
||||
|
@ -621,8 +633,8 @@ public class LeveldbTimelineStore extends AbstractService
|
|||
*/
|
||||
private TimelineEntities getEntityByTime(byte[] base,
|
||||
String entityType, Long limit, Long starttime, Long endtime,
|
||||
Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields)
|
||||
throws IOException {
|
||||
String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
|
||||
EnumSet<Field> fields) throws IOException {
|
||||
DBIterator iterator = null;
|
||||
try {
|
||||
KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
|
||||
|
@ -632,10 +644,25 @@ public class LeveldbTimelineStore extends AbstractService
|
|||
// if end time is null, place no restriction on end time
|
||||
endtime = Long.MAX_VALUE;
|
||||
}
|
||||
// using end time, construct a first key that will be seeked to
|
||||
byte[] revts = writeReverseOrderedLong(endtime);
|
||||
kb.add(revts);
|
||||
byte[] first = kb.getBytesForLookup();
|
||||
// construct a first key that will be seeked to using end time or fromId
|
||||
byte[] first = null;
|
||||
if (fromId != null) {
|
||||
Long fromIdStartTime = getStartTimeLong(fromId, entityType);
|
||||
if (fromIdStartTime == null) {
|
||||
// no start time for provided id, so return empty entities
|
||||
return new TimelineEntities();
|
||||
}
|
||||
if (fromIdStartTime <= endtime) {
|
||||
// if provided id's start time falls before the end of the window,
|
||||
// use it to construct the seek key
|
||||
first = kb.add(writeReverseOrderedLong(fromIdStartTime))
|
||||
.add(fromId).getBytesForLookup();
|
||||
}
|
||||
}
|
||||
// if seek key wasn't constructed using fromId, construct it using end ts
|
||||
if (first == null) {
|
||||
first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
|
||||
}
|
||||
byte[] last = null;
|
||||
if (starttime != null) {
|
||||
// if start time is not null, set a last key that will not be
|
||||
|
@ -665,6 +692,21 @@ public class LeveldbTimelineStore extends AbstractService
|
|||
KeyParser kp = new KeyParser(key, prefix.length);
|
||||
Long startTime = kp.getNextLong();
|
||||
String entityId = kp.getNextString();
|
||||
|
||||
if (fromTs != null) {
|
||||
long insertTime = readReverseOrderedLong(iterator.peekNext()
|
||||
.getValue(), 0);
|
||||
if (insertTime > fromTs) {
|
||||
byte[] firstKey = key;
|
||||
while (iterator.hasNext() && prefixMatches(firstKey,
|
||||
kp.getOffset(), key)) {
|
||||
iterator.next();
|
||||
key = iterator.peekNext().getKey();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// parse the entity that owns this key, iterating over all keys for
|
||||
// the entity
|
||||
TimelineEntity entity = getEntity(entityId, entityType, startTime,
|
||||
|
@ -715,9 +757,10 @@ public class LeveldbTimelineStore extends AbstractService
|
|||
writeBatch = db.createWriteBatch();
|
||||
List<TimelineEvent> events = entity.getEvents();
|
||||
// look up the start time for the entity
|
||||
revStartTime = getAndSetStartTime(entity.getEntityId(),
|
||||
entity.getEntityType(), entity.getStartTime(), events);
|
||||
if (revStartTime == null) {
|
||||
StartAndInsertTime startAndInsertTime = getAndSetStartTime(
|
||||
entity.getEntityId(), entity.getEntityType(),
|
||||
entity.getStartTime(), events);
|
||||
if (startAndInsertTime == null) {
|
||||
// if no start time is found, add an error and return
|
||||
TimelinePutError error = new TimelinePutError();
|
||||
error.setEntityId(entity.getEntityId());
|
||||
|
@ -726,11 +769,19 @@ public class LeveldbTimelineStore extends AbstractService
|
|||
response.addError(error);
|
||||
return;
|
||||
}
|
||||
revStartTime = writeReverseOrderedLong(startAndInsertTime
|
||||
.startTime);
|
||||
|
||||
Map<String, Set<Object>> primaryFilters = entity.getPrimaryFilters();
|
||||
|
||||
// write entity marker
|
||||
writeBatch.put(createEntityMarkerKey(entity.getEntityId(),
|
||||
entity.getEntityType(), revStartTime), EMPTY_BYTES);
|
||||
byte[] markerKey = createEntityMarkerKey(entity.getEntityId(),
|
||||
entity.getEntityType(), revStartTime);
|
||||
byte[] markerValue = writeReverseOrderedLong(startAndInsertTime
|
||||
.insertTime);
|
||||
writeBatch.put(markerKey, markerValue);
|
||||
writePrimaryFilterEntries(writeBatch, primaryFilters, markerKey,
|
||||
markerValue);
|
||||
|
||||
// write event entries
|
||||
if (events != null && !events.isEmpty()) {
|
||||
|
@ -821,17 +872,21 @@ public class LeveldbTimelineStore extends AbstractService
|
|||
lock = writeLocks.getLock(relatedEntity);
|
||||
lock.lock();
|
||||
try {
|
||||
byte[] relatedEntityStartTime = getAndSetStartTime(
|
||||
relatedEntity.getId(), relatedEntity.getType(),
|
||||
StartAndInsertTime relatedEntityStartAndInsertTime =
|
||||
getAndSetStartTime(relatedEntity.getId(), relatedEntity.getType(),
|
||||
readReverseOrderedLong(revStartTime, 0), null);
|
||||
if (relatedEntityStartTime == null) {
|
||||
if (relatedEntityStartAndInsertTime == null) {
|
||||
throw new IOException("Error setting start time for related entity");
|
||||
}
|
||||
byte[] relatedEntityStartTime = writeReverseOrderedLong(
|
||||
relatedEntityStartAndInsertTime.startTime);
|
||||
db.put(createRelatedEntityKey(relatedEntity.getId(),
|
||||
relatedEntity.getType(), relatedEntityStartTime,
|
||||
entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES);
|
||||
db.put(createEntityMarkerKey(relatedEntity.getId(),
|
||||
relatedEntity.getType(), relatedEntityStartTime), EMPTY_BYTES);
|
||||
relatedEntity.getType(), relatedEntityStartTime),
|
||||
writeReverseOrderedLong(relatedEntityStartAndInsertTime
|
||||
.insertTime));
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error putting related entity " + relatedEntity.getId() +
|
||||
" of type " + relatedEntity.getType() + " for entity " +
|
||||
|
@ -937,19 +992,18 @@ public class LeveldbTimelineStore extends AbstractService
|
|||
* @param entityType The type of the entity
|
||||
* @param startTime The start time of the entity, or null
|
||||
* @param events A list of events for the entity, or null
|
||||
* @return A byte array
|
||||
* @return A StartAndInsertTime
|
||||
* @throws IOException
|
||||
*/
|
||||
private byte[] getAndSetStartTime(String entityId, String entityType,
|
||||
Long startTime, List<TimelineEvent> events)
|
||||
private StartAndInsertTime getAndSetStartTime(String entityId,
|
||||
String entityType, Long startTime, List<TimelineEvent> events)
|
||||
throws IOException {
|
||||
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
|
||||
if (startTime == null) {
|
||||
// start time is not provided, so try to look it up
|
||||
if (startTimeWriteCache.containsKey(entity)) {
|
||||
// found the start time in the cache
|
||||
startTime = startTimeWriteCache.get(entity);
|
||||
return writeReverseOrderedLong(startTime);
|
||||
return startTimeWriteCache.get(entity);
|
||||
} else {
|
||||
if (events != null) {
|
||||
// prepare a start time from events in case it is needed
|
||||
|
@ -966,14 +1020,8 @@ public class LeveldbTimelineStore extends AbstractService
|
|||
} else {
|
||||
// start time is provided
|
||||
if (startTimeWriteCache.containsKey(entity)) {
|
||||
// check the provided start time matches the cache
|
||||
if (!startTime.equals(startTimeWriteCache.get(entity))) {
|
||||
// the start time is already in the cache,
|
||||
// and it is different from the provided start time,
|
||||
// so use the one from the cache
|
||||
startTime = startTimeWriteCache.get(entity);
|
||||
}
|
||||
return writeReverseOrderedLong(startTime);
|
||||
// always use start time from cache if it exists
|
||||
return startTimeWriteCache.get(entity);
|
||||
} else {
|
||||
// check the provided start time matches the db
|
||||
return checkStartTimeInDb(entity, startTime);
|
||||
|
@ -988,31 +1036,36 @@ public class LeveldbTimelineStore extends AbstractService
|
|||
* so it adds it back into the cache if it is found. Should only be called
|
||||
* when a lock has been obtained on the entity.
|
||||
*/
|
||||
private byte[] checkStartTimeInDb(EntityIdentifier entity,
|
||||
private StartAndInsertTime checkStartTimeInDb(EntityIdentifier entity,
|
||||
Long suggestedStartTime) throws IOException {
|
||||
StartAndInsertTime startAndInsertTime = null;
|
||||
// create lookup key for start time
|
||||
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
|
||||
// retrieve value for key
|
||||
byte[] v = db.get(b);
|
||||
byte[] revStartTime;
|
||||
if (v == null) {
|
||||
// start time doesn't exist in db
|
||||
if (suggestedStartTime == null) {
|
||||
return null;
|
||||
}
|
||||
startAndInsertTime = new StartAndInsertTime(suggestedStartTime,
|
||||
System.currentTimeMillis());
|
||||
|
||||
// write suggested start time
|
||||
revStartTime = writeReverseOrderedLong(suggestedStartTime);
|
||||
v = new byte[16];
|
||||
writeReverseOrderedLong(suggestedStartTime, v, 0);
|
||||
writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8);
|
||||
WriteOptions writeOptions = new WriteOptions();
|
||||
writeOptions.sync(true);
|
||||
db.put(b, revStartTime, writeOptions);
|
||||
db.put(b, v, writeOptions);
|
||||
} else {
|
||||
// found start time in db, so ignore suggested start time
|
||||
suggestedStartTime = readReverseOrderedLong(v, 0);
|
||||
revStartTime = v;
|
||||
startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0),
|
||||
readReverseOrderedLong(v, 8));
|
||||
}
|
||||
startTimeWriteCache.put(entity, suggestedStartTime);
|
||||
startTimeReadCache.put(entity, suggestedStartTime);
|
||||
return revStartTime;
|
||||
startTimeWriteCache.put(entity, startAndInsertTime);
|
||||
startTimeReadCache.put(entity, startAndInsertTime.startTime);
|
||||
return startAndInsertTime;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1245,8 +1298,6 @@ public class LeveldbTimelineStore extends AbstractService
|
|||
}
|
||||
}
|
||||
|
||||
// warning is suppressed to prevent eclipse from noting unclosed resource
|
||||
@SuppressWarnings("resource")
|
||||
@VisibleForTesting
|
||||
boolean deleteNextEntity(String entityType, byte[] reverseTimestamp,
|
||||
DBIterator iterator, DBIterator pfIterator, boolean seeked)
|
||||
|
|
|
@ -24,12 +24,14 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
|
@ -56,6 +58,8 @@ public class MemoryTimelineStore
|
|||
|
||||
private Map<EntityIdentifier, TimelineEntity> entities =
|
||||
new HashMap<EntityIdentifier, TimelineEntity>();
|
||||
private Map<EntityIdentifier, Long> entityInsertTimes =
|
||||
new HashMap<EntityIdentifier, Long>();
|
||||
|
||||
public MemoryTimelineStore() {
|
||||
super(MemoryTimelineStore.class.getName());
|
||||
|
@ -63,8 +67,9 @@ public class MemoryTimelineStore
|
|||
|
||||
@Override
|
||||
public TimelineEntities getEntities(String entityType, Long limit,
|
||||
Long windowStart, Long windowEnd, NameValuePair primaryFilter,
|
||||
Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields) {
|
||||
Long windowStart, Long windowEnd, String fromId, Long fromTs,
|
||||
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
|
||||
EnumSet<Field> fields) {
|
||||
if (limit == null) {
|
||||
limit = DEFAULT_LIMIT;
|
||||
}
|
||||
|
@ -77,8 +82,26 @@ public class MemoryTimelineStore
|
|||
if (fields == null) {
|
||||
fields = EnumSet.allOf(Field.class);
|
||||
}
|
||||
|
||||
Iterator<TimelineEntity> entityIterator = null;
|
||||
if (fromId != null) {
|
||||
TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId,
|
||||
entityType));
|
||||
if (firstEntity == null) {
|
||||
return new TimelineEntities();
|
||||
} else {
|
||||
entityIterator = new TreeSet<TimelineEntity>(entities.values())
|
||||
.tailSet(firstEntity, true).iterator();
|
||||
}
|
||||
}
|
||||
if (entityIterator == null) {
|
||||
entityIterator = new PriorityQueue<TimelineEntity>(entities.values())
|
||||
.iterator();
|
||||
}
|
||||
|
||||
List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
|
||||
for (TimelineEntity entity : new PriorityQueue<TimelineEntity>(entities.values())) {
|
||||
while (entityIterator.hasNext()) {
|
||||
TimelineEntity entity = entityIterator.next();
|
||||
if (entitiesSelected.size() >= limit) {
|
||||
break;
|
||||
}
|
||||
|
@ -91,6 +114,10 @@ public class MemoryTimelineStore
|
|||
if (entity.getStartTime() > windowEnd) {
|
||||
continue;
|
||||
}
|
||||
if (fromTs != null && entityInsertTimes.get(new EntityIdentifier(
|
||||
entity.getEntityId(), entity.getEntityType())) > fromTs) {
|
||||
continue;
|
||||
}
|
||||
if (primaryFilter != null &&
|
||||
!matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
|
||||
continue;
|
||||
|
@ -196,6 +223,7 @@ public class MemoryTimelineStore
|
|||
existingEntity.setEntityType(entity.getEntityType());
|
||||
existingEntity.setStartTime(entity.getStartTime());
|
||||
entities.put(entityId, existingEntity);
|
||||
entityInsertTimes.put(entityId, System.currentTimeMillis());
|
||||
}
|
||||
if (entity.getEvents() != null) {
|
||||
if (existingEntity.getEvents() == null) {
|
||||
|
@ -215,9 +243,16 @@ public class MemoryTimelineStore
|
|||
error.setErrorCode(TimelinePutError.NO_START_TIME);
|
||||
response.addError(error);
|
||||
entities.remove(entityId);
|
||||
entityInsertTimes.remove(entityId);
|
||||
continue;
|
||||
} else {
|
||||
existingEntity.setStartTime(entity.getEvents().get(0).getTimestamp());
|
||||
Long min = Long.MAX_VALUE;
|
||||
for (TimelineEvent e : entity.getEvents()) {
|
||||
if (min > e.getTimestamp()) {
|
||||
min = e.getTimestamp();
|
||||
}
|
||||
}
|
||||
existingEntity.setStartTime(min);
|
||||
}
|
||||
}
|
||||
if (entity.getPrimaryFilters() != null) {
|
||||
|
@ -264,6 +299,7 @@ public class MemoryTimelineStore
|
|||
relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
|
||||
existingEntity.getEntityId());
|
||||
entities.put(relatedEntityId, relatedEntity);
|
||||
entityInsertTimes.put(relatedEntityId, System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,8 +55,11 @@ public interface TimelineReader {
|
|||
final long DEFAULT_LIMIT = 100;
|
||||
|
||||
/**
|
||||
* This method retrieves a list of entity information, {@link TimelineEntity}, sorted
|
||||
* by the starting timestamp for the entity, descending.
|
||||
* This method retrieves a list of entity information, {@link TimelineEntity},
|
||||
* sorted by the starting timestamp for the entity, descending. The starting
|
||||
* timestamp of an entity is a timestamp specified by the client. If it is not
|
||||
* explicitly specified, it will be chosen by the store to be the earliest
|
||||
* timestamp of the events received in the first put for the entity.
|
||||
*
|
||||
* @param entityType
|
||||
* The type of entities to return (required).
|
||||
|
@ -69,6 +72,17 @@ public interface TimelineReader {
|
|||
* @param windowEnd
|
||||
* The latest start timestamp to retrieve (inclusive). If null,
|
||||
* defaults to {@link Long#MAX_VALUE}
|
||||
* @param fromId
|
||||
* If fromId is not null, retrieve entities earlier than and
|
||||
* including the specified ID. If no start time is found for the
|
||||
* specified ID, an empty list of entities will be returned. The
|
||||
* windowEnd parameter will take precedence if the start time of this
|
||||
* entity falls later than windowEnd.
|
||||
* @param fromTs
|
||||
* If fromTs is not null, ignore entities that were inserted into the
|
||||
* store after the given timestamp. The entity's insert timestamp
|
||||
* used for this comparison is the store's system time when the first
|
||||
* put for the entity was received (not the entity's start time).
|
||||
* @param primaryFilter
|
||||
* Retrieves only entities that have the specified primary filter. If
|
||||
* null, retrieves all entities. This is an indexed retrieval, and no
|
||||
|
@ -88,7 +102,7 @@ public interface TimelineReader {
|
|||
* @throws IOException
|
||||
*/
|
||||
TimelineEntities getEntities(String entityType,
|
||||
Long limit, Long windowStart, Long windowEnd,
|
||||
Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
|
||||
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
|
||||
EnumSet<Field> fieldsToRetrieve) throws IOException;
|
||||
|
||||
|
|
|
@ -134,6 +134,8 @@ public class TimelineWebServices {
|
|||
@QueryParam("secondaryFilter") String secondaryFilter,
|
||||
@QueryParam("windowStart") String windowStart,
|
||||
@QueryParam("windowEnd") String windowEnd,
|
||||
@QueryParam("fromId") String fromId,
|
||||
@QueryParam("fromTs") String fromTs,
|
||||
@QueryParam("limit") String limit,
|
||||
@QueryParam("fields") String fields) {
|
||||
init(res);
|
||||
|
@ -144,6 +146,8 @@ public class TimelineWebServices {
|
|||
parseLongStr(limit),
|
||||
parseLongStr(windowStart),
|
||||
parseLongStr(windowEnd),
|
||||
parseStr(fromId),
|
||||
parseLongStr(fromTs),
|
||||
parsePairStr(primaryFilter, ":"),
|
||||
parsePairsStr(secondaryFilter, ",", ":"),
|
||||
parseFieldsStr(fields, ","));
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -35,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
|
||||
import org.iq80.leveldb.DBIterator;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -46,8 +44,7 @@ import static org.junit.Assert.assertEquals;
|
|||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class TestLeveldbTimelineStore
|
||||
extends TimelineStoreTestUtils {
|
||||
public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
|
||||
private FileContext fsContext;
|
||||
private File fsPath;
|
||||
|
||||
|
@ -87,6 +84,16 @@ public class TestLeveldbTimelineStore
|
|||
super.testGetEntities();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetEntitiesWithFromId() throws IOException {
|
||||
super.testGetEntitiesWithFromId();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetEntitiesWithFromTs() throws IOException {
|
||||
super.testGetEntitiesWithFromTs();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetEntitiesWithPrimaryFilters() throws IOException {
|
||||
super.testGetEntitiesWithPrimaryFilters();
|
||||
|
@ -135,55 +142,45 @@ public class TestLeveldbTimelineStore
|
|||
@Test
|
||||
public void testGetEntityTypes() throws IOException {
|
||||
List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes();
|
||||
assertEquals(2, entityTypes.size());
|
||||
assertEquals(4, entityTypes.size());
|
||||
assertEquals(entityType1, entityTypes.get(0));
|
||||
assertEquals(entityType2, entityTypes.get(1));
|
||||
assertEquals(entityType4, entityTypes.get(2));
|
||||
assertEquals(entityType5, entityTypes.get(3));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteEntities() throws IOException, InterruptedException {
|
||||
assertEquals(2, store.getEntities("type_1", null, null, null, null, null,
|
||||
null).getEntities().size());
|
||||
assertEquals(1, store.getEntities("type_2", null, null, null, null, null,
|
||||
null).getEntities().size());
|
||||
assertEquals(2, getEntities("type_1").size());
|
||||
assertEquals(1, getEntities("type_2").size());
|
||||
|
||||
assertEquals(false, deleteNextEntity(entityType1,
|
||||
writeReverseOrderedLong(122l)));
|
||||
assertEquals(2, store.getEntities("type_1", null, null, null, null, null,
|
||||
null).getEntities().size());
|
||||
assertEquals(1, store.getEntities("type_2", null, null, null, null, null,
|
||||
null).getEntities().size());
|
||||
assertEquals(2, getEntities("type_1").size());
|
||||
assertEquals(1, getEntities("type_2").size());
|
||||
|
||||
assertEquals(true, deleteNextEntity(entityType1,
|
||||
writeReverseOrderedLong(123l)));
|
||||
List<TimelineEntity> entities =
|
||||
store.getEntities("type_2", null, null, null, null, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
List<TimelineEntity> entities = getEntities("type_2");
|
||||
assertEquals(1, entities.size());
|
||||
verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap(
|
||||
entityType1, Collections.singleton(entityId1b)), EMPTY_PRIMARY_FILTERS,
|
||||
EMPTY_MAP, entities.get(0));
|
||||
entities = store.getEntities("type_1", null, null, null, userFilter, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntitiesWithPrimaryFilter("type_1", userFilter);
|
||||
assertEquals(1, entities.size());
|
||||
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
|
||||
((LeveldbTimelineStore)store).discardOldEntities(-123l);
|
||||
assertEquals(1, store.getEntities("type_1", null, null, null, null, null,
|
||||
null).getEntities().size());
|
||||
assertEquals(0, store.getEntities("type_2", null, null, null, null, null,
|
||||
null).getEntities().size());
|
||||
assertEquals(1, ((LeveldbTimelineStore)store).getEntityTypes().size());
|
||||
assertEquals(1, getEntities("type_1").size());
|
||||
assertEquals(0, getEntities("type_2").size());
|
||||
assertEquals(3, ((LeveldbTimelineStore)store).getEntityTypes().size());
|
||||
|
||||
((LeveldbTimelineStore)store).discardOldEntities(123l);
|
||||
assertEquals(0, store.getEntities("type_1", null, null, null, null, null,
|
||||
null).getEntities().size());
|
||||
assertEquals(0, store.getEntities("type_2", null, null, null, null, null,
|
||||
null).getEntities().size());
|
||||
assertEquals(0, getEntities("type_1").size());
|
||||
assertEquals(0, getEntities("type_2").size());
|
||||
assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
|
||||
assertEquals(0, store.getEntities("type_1", null, null, null, userFilter,
|
||||
null, null).getEntities().size());
|
||||
assertEquals(0, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -200,14 +197,13 @@ public class TestLeveldbTimelineStore
|
|||
assertEquals(0, response.getErrors().size());
|
||||
|
||||
NameValuePair pfPair = new NameValuePair("user", "otheruser");
|
||||
List<TimelineEntity> entities = store.getEntities("type_1", null, null,
|
||||
null, pfPair, null, null).getEntities();
|
||||
List<TimelineEntity> entities = getEntitiesWithPrimaryFilter("type_1",
|
||||
pfPair);
|
||||
assertEquals(1, entities.size());
|
||||
verifyEntityInfo(entityId1b, entityType1, Collections.singletonList(ev2),
|
||||
EMPTY_REL_ENTITIES, primaryFilter, EMPTY_MAP, entities.get(0));
|
||||
|
||||
entities = store.getEntities("type_1", null, null, null,
|
||||
userFilter, null, null).getEntities();
|
||||
entities = getEntitiesWithPrimaryFilter("type_1", userFilter);
|
||||
assertEquals(2, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
|
@ -215,22 +211,43 @@ public class TestLeveldbTimelineStore
|
|||
primaryFilters, otherInfo, entities.get(1));
|
||||
|
||||
((LeveldbTimelineStore)store).discardOldEntities(-123l);
|
||||
assertEquals(1, store.getEntities("type_1", null, null, null, pfPair, null,
|
||||
null).getEntities().size());
|
||||
assertEquals(2, store.getEntities("type_1", null, null, null, userFilter,
|
||||
null, null).getEntities().size());
|
||||
assertEquals(1, getEntitiesWithPrimaryFilter("type_1", pfPair).size());
|
||||
assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
|
||||
|
||||
((LeveldbTimelineStore)store).discardOldEntities(123l);
|
||||
assertEquals(0, store.getEntities("type_1", null, null, null, null, null,
|
||||
null).getEntities().size());
|
||||
assertEquals(0, store.getEntities("type_2", null, null, null, null, null,
|
||||
null).getEntities().size());
|
||||
assertEquals(0, getEntities("type_1").size());
|
||||
assertEquals(0, getEntities("type_2").size());
|
||||
assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
|
||||
|
||||
assertEquals(0, store.getEntities("type_1", null, null, null, pfPair, null,
|
||||
null).getEntities().size());
|
||||
assertEquals(0, store.getEntities("type_1", null, null, null, userFilter,
|
||||
null, null).getEntities().size());
|
||||
assertEquals(0, getEntitiesWithPrimaryFilter("type_1", pfPair).size());
|
||||
assertEquals(0, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFromTsWithDeletion()
|
||||
throws IOException, InterruptedException {
|
||||
long l = System.currentTimeMillis();
|
||||
assertEquals(2, getEntitiesFromTs("type_1", l).size());
|
||||
assertEquals(1, getEntitiesFromTs("type_2", l).size());
|
||||
assertEquals(2, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
|
||||
l).size());
|
||||
((LeveldbTimelineStore)store).discardOldEntities(123l);
|
||||
assertEquals(0, getEntitiesFromTs("type_1", l).size());
|
||||
assertEquals(0, getEntitiesFromTs("type_2", l).size());
|
||||
assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
|
||||
l).size());
|
||||
assertEquals(0, getEntities("type_1").size());
|
||||
assertEquals(0, getEntities("type_2").size());
|
||||
assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
|
||||
l).size());
|
||||
loadTestData();
|
||||
assertEquals(0, getEntitiesFromTs("type_1", l).size());
|
||||
assertEquals(0, getEntitiesFromTs("type_2", l).size());
|
||||
assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
|
||||
l).size());
|
||||
assertEquals(2, getEntities("type_1").size());
|
||||
assertEquals(1, getEntities("type_2").size());
|
||||
assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,16 +19,13 @@
|
|||
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
|
||||
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.MemoryTimelineStore;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class TestMemoryTimelineStore
|
||||
extends TimelineStoreTestUtils {
|
||||
public class TestMemoryTimelineStore extends TimelineStoreTestUtils {
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
|
@ -58,6 +55,16 @@ public class TestMemoryTimelineStore
|
|||
super.testGetEntities();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetEntitiesWithFromId() throws IOException {
|
||||
super.testGetEntitiesWithFromId();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetEntitiesWithFromTs() throws IOException {
|
||||
super.testGetEntitiesWithFromTs();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetEntitiesWithPrimaryFilters() throws IOException {
|
||||
super.testGetEntitiesWithPrimaryFilters();
|
||||
|
|
|
@ -41,12 +41,12 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
|||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.NameValuePair;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
|
||||
|
||||
public class TimelineStoreTestUtils {
|
||||
|
||||
protected static final List<TimelineEvent> EMPTY_EVENTS =
|
||||
Collections.emptyList();
|
||||
protected static final Map<String, Object> EMPTY_MAP =
|
||||
Collections.emptyMap();
|
||||
protected static final Map<String, Set<Object>> EMPTY_PRIMARY_FILTERS =
|
||||
|
@ -60,11 +60,16 @@ public class TimelineStoreTestUtils {
|
|||
protected String entityId1b;
|
||||
protected String entityId2;
|
||||
protected String entityType2;
|
||||
protected String entityId4;
|
||||
protected String entityType4;
|
||||
protected String entityId5;
|
||||
protected String entityType5;
|
||||
protected Map<String, Set<Object>> primaryFilters;
|
||||
protected Map<String, Object> secondaryFilters;
|
||||
protected Map<String, Object> allFilters;
|
||||
protected Map<String, Object> otherInfo;
|
||||
protected Map<String, Set<String>> relEntityMap;
|
||||
protected Map<String, Set<String>> relEntityMap2;
|
||||
protected NameValuePair userFilter;
|
||||
protected NameValuePair numericFilter1;
|
||||
protected NameValuePair numericFilter2;
|
||||
|
@ -78,11 +83,13 @@ public class TimelineStoreTestUtils {
|
|||
protected Map<String, Object> eventInfo;
|
||||
protected List<TimelineEvent> events1;
|
||||
protected List<TimelineEvent> events2;
|
||||
protected long beforeTs;
|
||||
|
||||
/**
|
||||
* Load test data into the given store
|
||||
*/
|
||||
protected void loadTestData() throws IOException {
|
||||
beforeTs = System.currentTimeMillis()-1;
|
||||
TimelineEntities entities = new TimelineEntities();
|
||||
Map<String, Set<Object>> primaryFilters =
|
||||
new HashMap<String, Set<Object>>();
|
||||
|
@ -110,6 +117,10 @@ public class TimelineStoreTestUtils {
|
|||
String entityId1b = "id_2";
|
||||
String entityId2 = "id_2";
|
||||
String entityType2 = "type_2";
|
||||
String entityId4 = "id_4";
|
||||
String entityType4 = "type_4";
|
||||
String entityId5 = "id_5";
|
||||
String entityType5 = "type_5";
|
||||
|
||||
Map<String, Set<String>> relatedEntities =
|
||||
new HashMap<String, Set<String>>();
|
||||
|
@ -161,6 +172,13 @@ public class TimelineStoreTestUtils {
|
|||
assertEquals("badentityid", error.getEntityId());
|
||||
assertEquals("badentity", error.getEntityType());
|
||||
assertEquals(TimelinePutError.NO_START_TIME, error.getErrorCode());
|
||||
|
||||
relatedEntities.clear();
|
||||
relatedEntities.put(entityType5, Collections.singleton(entityId5));
|
||||
entities.setEntities(Collections.singletonList(createEntity(entityId4,
|
||||
entityType4, 42l, null, relatedEntities, null, null)));
|
||||
response = store.put(entities);
|
||||
assertEquals(0, response.getErrors().size());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -211,6 +229,10 @@ public class TimelineStoreTestUtils {
|
|||
entityId1b = "id_2";
|
||||
entityId2 = "id_2";
|
||||
entityType2 = "type_2";
|
||||
entityId4 = "id_4";
|
||||
entityType4 = "type_4";
|
||||
entityId5 = "id_5";
|
||||
entityType5 = "type_5";
|
||||
|
||||
ev1 = createEvent(123l, "start_event", null);
|
||||
|
||||
|
@ -228,6 +250,10 @@ public class TimelineStoreTestUtils {
|
|||
ids.add(entityId1b);
|
||||
relEntityMap.put(entityType1, ids);
|
||||
|
||||
relEntityMap2 =
|
||||
new HashMap<String, Set<String>>();
|
||||
relEntityMap2.put(entityType4, Collections.singleton(entityId4));
|
||||
|
||||
ev3 = createEvent(789l, "launch_event", null);
|
||||
ev4 = createEvent(-123l, "init_event", null);
|
||||
events2 = new ArrayList<TimelineEvent>();
|
||||
|
@ -241,16 +267,24 @@ public class TimelineStoreTestUtils {
|
|||
store.getEntity("id_1", "type_2", EnumSet.allOf(Field.class)));
|
||||
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, store.getEntity(entityId1, entityType1,
|
||||
EnumSet.allOf(Field.class)));
|
||||
primaryFilters, otherInfo, 123l, store.getEntity(entityId1,
|
||||
entityType1, EnumSet.allOf(Field.class)));
|
||||
|
||||
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, store.getEntity(entityId1b, entityType1,
|
||||
EnumSet.allOf(Field.class)));
|
||||
primaryFilters, otherInfo, 123l, store.getEntity(entityId1b,
|
||||
entityType1, EnumSet.allOf(Field.class)));
|
||||
|
||||
verifyEntityInfo(entityId2, entityType2, events2, relEntityMap,
|
||||
EMPTY_PRIMARY_FILTERS, EMPTY_MAP, store.getEntity(entityId2, entityType2,
|
||||
EnumSet.allOf(Field.class)));
|
||||
EMPTY_PRIMARY_FILTERS, EMPTY_MAP, -123l, store.getEntity(entityId2,
|
||||
entityType2, EnumSet.allOf(Field.class)));
|
||||
|
||||
verifyEntityInfo(entityId4, entityType4, EMPTY_EVENTS, EMPTY_REL_ENTITIES,
|
||||
EMPTY_PRIMARY_FILTERS, EMPTY_MAP, 42l, store.getEntity(entityId4,
|
||||
entityType4, EnumSet.allOf(Field.class)));
|
||||
|
||||
verifyEntityInfo(entityId5, entityType5, EMPTY_EVENTS, relEntityMap2,
|
||||
EMPTY_PRIMARY_FILTERS, EMPTY_MAP, 42l, store.getEntity(entityId5,
|
||||
entityType5, EnumSet.allOf(Field.class)));
|
||||
|
||||
// test getting single fields
|
||||
verifyEntityInfo(entityId1, entityType1, events1, null, null, null,
|
||||
|
@ -276,70 +310,132 @@ public class TimelineStoreTestUtils {
|
|||
EnumSet.of(Field.RELATED_ENTITIES)));
|
||||
}
|
||||
|
||||
protected List<TimelineEntity> getEntities(String entityType)
|
||||
throws IOException {
|
||||
return store.getEntities(entityType, null, null, null, null, null,
|
||||
null, null, null).getEntities();
|
||||
}
|
||||
|
||||
protected List<TimelineEntity> getEntitiesWithPrimaryFilter(
|
||||
String entityType, NameValuePair primaryFilter) throws IOException {
|
||||
return store.getEntities(entityType, null, null, null, null, null,
|
||||
primaryFilter, null, null).getEntities();
|
||||
}
|
||||
|
||||
protected List<TimelineEntity> getEntitiesFromId(String entityType,
|
||||
String fromId) throws IOException {
|
||||
return store.getEntities(entityType, null, null, null, fromId, null,
|
||||
null, null, null).getEntities();
|
||||
}
|
||||
|
||||
protected List<TimelineEntity> getEntitiesFromTs(String entityType,
|
||||
long fromTs) throws IOException {
|
||||
return store.getEntities(entityType, null, null, null, null, fromTs,
|
||||
null, null, null).getEntities();
|
||||
}
|
||||
|
||||
protected List<TimelineEntity> getEntitiesFromIdWithPrimaryFilter(
|
||||
String entityType, NameValuePair primaryFilter, String fromId)
|
||||
throws IOException {
|
||||
return store.getEntities(entityType, null, null, null, fromId, null,
|
||||
primaryFilter, null, null).getEntities();
|
||||
}
|
||||
|
||||
protected List<TimelineEntity> getEntitiesFromTsWithPrimaryFilter(
|
||||
String entityType, NameValuePair primaryFilter, long fromTs)
|
||||
throws IOException {
|
||||
return store.getEntities(entityType, null, null, null, null, fromTs,
|
||||
primaryFilter, null, null).getEntities();
|
||||
}
|
||||
|
||||
protected List<TimelineEntity> getEntitiesFromIdWithWindow(String entityType,
|
||||
Long windowEnd, String fromId) throws IOException {
|
||||
return store.getEntities(entityType, null, null, windowEnd, fromId, null,
|
||||
null, null, null).getEntities();
|
||||
}
|
||||
|
||||
protected List<TimelineEntity> getEntitiesFromIdWithPrimaryFilterAndWindow(
|
||||
String entityType, Long windowEnd, String fromId,
|
||||
NameValuePair primaryFilter) throws IOException {
|
||||
return store.getEntities(entityType, null, null, windowEnd, fromId, null,
|
||||
primaryFilter, null, null).getEntities();
|
||||
}
|
||||
|
||||
protected List<TimelineEntity> getEntitiesWithFilters(String entityType,
|
||||
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters)
|
||||
throws IOException {
|
||||
return store.getEntities(entityType, null, null, null, null, null,
|
||||
primaryFilter, secondaryFilters, null).getEntities();
|
||||
}
|
||||
|
||||
protected List<TimelineEntity> getEntities(String entityType, Long limit,
|
||||
Long windowStart, Long windowEnd, NameValuePair primaryFilter,
|
||||
EnumSet<Field> fields) throws IOException {
|
||||
return store.getEntities(entityType, limit, windowStart, windowEnd, null,
|
||||
null, primaryFilter, null, fields).getEntities();
|
||||
}
|
||||
|
||||
public void testGetEntities() throws IOException {
|
||||
// test getting entities
|
||||
assertEquals("nonzero entities size for nonexistent type", 0,
|
||||
store.getEntities("type_0", null, null, null, null, null,
|
||||
null).getEntities().size());
|
||||
getEntities("type_0").size());
|
||||
assertEquals("nonzero entities size for nonexistent type", 0,
|
||||
store.getEntities("type_3", null, null, null, null, null,
|
||||
null).getEntities().size());
|
||||
getEntities("type_3").size());
|
||||
assertEquals("nonzero entities size for nonexistent type", 0,
|
||||
store.getEntities("type_0", null, null, null, userFilter,
|
||||
null, null).getEntities().size());
|
||||
getEntities("type_6").size());
|
||||
assertEquals("nonzero entities size for nonexistent type", 0,
|
||||
store.getEntities("type_3", null, null, null, userFilter,
|
||||
null, null).getEntities().size());
|
||||
getEntitiesWithPrimaryFilter("type_0", userFilter).size());
|
||||
assertEquals("nonzero entities size for nonexistent type", 0,
|
||||
getEntitiesWithPrimaryFilter("type_3", userFilter).size());
|
||||
assertEquals("nonzero entities size for nonexistent type", 0,
|
||||
getEntitiesWithPrimaryFilter("type_6", userFilter).size());
|
||||
|
||||
List<TimelineEntity> entities =
|
||||
store.getEntities("type_1", null, null, null, null, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
List<TimelineEntity> entities = getEntities("type_1");
|
||||
assertEquals(2, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(1));
|
||||
|
||||
entities = store.getEntities("type_2", null, null, null, null, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntities("type_2");
|
||||
assertEquals(1, entities.size());
|
||||
verifyEntityInfo(entityId2, entityType2, events2, relEntityMap,
|
||||
EMPTY_PRIMARY_FILTERS, EMPTY_MAP, entities.get(0));
|
||||
|
||||
entities = store.getEntities("type_1", 1l, null, null, null, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntities("type_1", 1l, null, null, null,
|
||||
EnumSet.allOf(Field.class));
|
||||
assertEquals(1, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
|
||||
entities = store.getEntities("type_1", 1l, 0l, null, null, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntities("type_1", 1l, 0l, null, null,
|
||||
EnumSet.allOf(Field.class));
|
||||
assertEquals(1, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
|
||||
entities = store.getEntities("type_1", null, 234l, null, null, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntities("type_1", null, 234l, null, null,
|
||||
EnumSet.allOf(Field.class));
|
||||
assertEquals(0, entities.size());
|
||||
|
||||
entities = store.getEntities("type_1", null, 123l, null, null, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntities("type_1", null, 123l, null, null,
|
||||
EnumSet.allOf(Field.class));
|
||||
assertEquals(0, entities.size());
|
||||
|
||||
entities = store.getEntities("type_1", null, 234l, 345l, null, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntities("type_1", null, 234l, 345l, null,
|
||||
EnumSet.allOf(Field.class));
|
||||
assertEquals(0, entities.size());
|
||||
|
||||
entities = store.getEntities("type_1", null, null, 345l, null, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntities("type_1", null, null, 345l, null,
|
||||
EnumSet.allOf(Field.class));
|
||||
assertEquals(2, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(1));
|
||||
|
||||
entities = store.getEntities("type_1", null, null, 123l, null, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntities("type_1", null, null, 123l, null,
|
||||
EnumSet.allOf(Field.class));
|
||||
assertEquals(2, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
|
@ -347,79 +443,152 @@ public class TimelineStoreTestUtils {
|
|||
primaryFilters, otherInfo, entities.get(1));
|
||||
}
|
||||
|
||||
public void testGetEntitiesWithFromId() throws IOException {
|
||||
List<TimelineEntity> entities = getEntitiesFromId("type_1", entityId1);
|
||||
assertEquals(2, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(1));
|
||||
|
||||
entities = getEntitiesFromId("type_1", entityId1b);
|
||||
assertEquals(1, entities.size());
|
||||
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
|
||||
entities = getEntitiesFromIdWithWindow("type_1", 0l, entityId1);
|
||||
assertEquals(0, entities.size());
|
||||
|
||||
entities = getEntitiesFromId("type_2", "a");
|
||||
assertEquals(0, entities.size());
|
||||
|
||||
entities = getEntitiesFromId("type_2", entityId2);
|
||||
assertEquals(1, entities.size());
|
||||
verifyEntityInfo(entityId2, entityType2, events2, relEntityMap,
|
||||
EMPTY_PRIMARY_FILTERS, EMPTY_MAP, entities.get(0));
|
||||
|
||||
entities = getEntitiesFromIdWithWindow("type_2", -456l, null);
|
||||
assertEquals(0, entities.size());
|
||||
|
||||
entities = getEntitiesFromIdWithWindow("type_2", -456l, "a");
|
||||
assertEquals(0, entities.size());
|
||||
|
||||
entities = getEntitiesFromIdWithWindow("type_2", 0l, null);
|
||||
assertEquals(1, entities.size());
|
||||
|
||||
entities = getEntitiesFromIdWithWindow("type_2", 0l, entityId2);
|
||||
assertEquals(1, entities.size());
|
||||
|
||||
// same tests with primary filters
|
||||
entities = getEntitiesFromIdWithPrimaryFilter("type_1", userFilter,
|
||||
entityId1);
|
||||
assertEquals(2, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(1));
|
||||
|
||||
entities = getEntitiesFromIdWithPrimaryFilter("type_1", userFilter,
|
||||
entityId1b);
|
||||
assertEquals(1, entities.size());
|
||||
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
|
||||
entities = getEntitiesFromIdWithPrimaryFilterAndWindow("type_1", 0l,
|
||||
entityId1, userFilter);
|
||||
assertEquals(0, entities.size());
|
||||
|
||||
entities = getEntitiesFromIdWithPrimaryFilter("type_2", userFilter, "a");
|
||||
assertEquals(0, entities.size());
|
||||
}
|
||||
|
||||
public void testGetEntitiesWithFromTs() throws IOException {
|
||||
assertEquals(0, getEntitiesFromTs("type_1", beforeTs).size());
|
||||
assertEquals(0, getEntitiesFromTs("type_2", beforeTs).size());
|
||||
assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
|
||||
beforeTs).size());
|
||||
long afterTs = System.currentTimeMillis();
|
||||
assertEquals(2, getEntitiesFromTs("type_1", afterTs).size());
|
||||
assertEquals(1, getEntitiesFromTs("type_2", afterTs).size());
|
||||
assertEquals(2, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
|
||||
afterTs).size());
|
||||
assertEquals(2, getEntities("type_1").size());
|
||||
assertEquals(1, getEntities("type_2").size());
|
||||
assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
|
||||
// check insert time is not overwritten
|
||||
long beforeTs = this.beforeTs;
|
||||
loadTestData();
|
||||
assertEquals(0, getEntitiesFromTs("type_1", beforeTs).size());
|
||||
assertEquals(0, getEntitiesFromTs("type_2", beforeTs).size());
|
||||
assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
|
||||
beforeTs).size());
|
||||
assertEquals(2, getEntitiesFromTs("type_1", afterTs).size());
|
||||
assertEquals(1, getEntitiesFromTs("type_2", afterTs).size());
|
||||
assertEquals(2, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
|
||||
afterTs).size());
|
||||
}
|
||||
|
||||
public void testGetEntitiesWithPrimaryFilters() throws IOException {
|
||||
// test using primary filter
|
||||
assertEquals("nonzero entities size for primary filter", 0,
|
||||
store.getEntities("type_1", null, null, null,
|
||||
new NameValuePair("none", "none"), null,
|
||||
EnumSet.allOf(Field.class)).getEntities().size());
|
||||
getEntitiesWithPrimaryFilter("type_1",
|
||||
new NameValuePair("none", "none")).size());
|
||||
assertEquals("nonzero entities size for primary filter", 0,
|
||||
store.getEntities("type_2", null, null, null,
|
||||
new NameValuePair("none", "none"), null,
|
||||
EnumSet.allOf(Field.class)).getEntities().size());
|
||||
getEntitiesWithPrimaryFilter("type_2",
|
||||
new NameValuePair("none", "none")).size());
|
||||
assertEquals("nonzero entities size for primary filter", 0,
|
||||
store.getEntities("type_3", null, null, null,
|
||||
new NameValuePair("none", "none"), null,
|
||||
EnumSet.allOf(Field.class)).getEntities().size());
|
||||
getEntitiesWithPrimaryFilter("type_3",
|
||||
new NameValuePair("none", "none")).size());
|
||||
|
||||
List<TimelineEntity> entities = store.getEntities("type_1", null, null, null,
|
||||
userFilter, null, EnumSet.allOf(Field.class)).getEntities();
|
||||
List<TimelineEntity> entities = getEntitiesWithPrimaryFilter("type_1",
|
||||
userFilter);
|
||||
assertEquals(2, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(1));
|
||||
|
||||
store.getEntities("type_1", null, null, null,
|
||||
numericFilter1, null, EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntitiesWithPrimaryFilter("type_1", numericFilter1);
|
||||
assertEquals(2, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(1));
|
||||
|
||||
store.getEntities("type_1", null, null, null,
|
||||
numericFilter2, null, EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntitiesWithPrimaryFilter("type_1", numericFilter2);
|
||||
assertEquals(2, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(1));
|
||||
|
||||
store.getEntities("type_1", null, null, null,
|
||||
numericFilter3, null, EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntitiesWithPrimaryFilter("type_1", numericFilter3);
|
||||
assertEquals(2, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(1));
|
||||
|
||||
entities = store.getEntities("type_2", null, null, null, userFilter, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntitiesWithPrimaryFilter("type_2", userFilter);
|
||||
assertEquals(0, entities.size());
|
||||
|
||||
entities = store.getEntities("type_1", 1l, null, null, userFilter, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntities("type_1", 1l, null, null, userFilter, null);
|
||||
assertEquals(1, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
|
||||
entities = store.getEntities("type_1", 1l, 0l, null, userFilter, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntities("type_1", 1l, 0l, null, userFilter, null);
|
||||
assertEquals(1, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
|
||||
entities = store.getEntities("type_1", null, 234l, null, userFilter, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntities("type_1", null, 234l, null, userFilter, null);
|
||||
assertEquals(0, entities.size());
|
||||
|
||||
entities = store.getEntities("type_1", null, 234l, 345l, userFilter, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntities("type_1", null, 234l, 345l, userFilter, null);
|
||||
assertEquals(0, entities.size());
|
||||
|
||||
entities = store.getEntities("type_1", null, null, 345l, userFilter, null,
|
||||
EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntities("type_1", null, null, 345l, userFilter, null);
|
||||
assertEquals(2, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
|
@ -429,28 +598,29 @@ public class TimelineStoreTestUtils {
|
|||
|
||||
public void testGetEntitiesWithSecondaryFilters() throws IOException {
|
||||
// test using secondary filter
|
||||
List<TimelineEntity> entities = store.getEntities("type_1", null, null, null,
|
||||
null, goodTestingFilters, EnumSet.allOf(Field.class)).getEntities();
|
||||
List<TimelineEntity> entities = getEntitiesWithFilters("type_1", null,
|
||||
goodTestingFilters);
|
||||
assertEquals(2, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(1));
|
||||
|
||||
entities = store.getEntities("type_1", null, null, null, userFilter,
|
||||
goodTestingFilters, EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntitiesWithFilters("type_1", userFilter, goodTestingFilters);
|
||||
assertEquals(2, entities.size());
|
||||
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(0));
|
||||
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||
primaryFilters, otherInfo, entities.get(1));
|
||||
|
||||
entities = store.getEntities("type_1", null, null, null, null,
|
||||
badTestingFilters, EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntitiesWithFilters("type_1", null,
|
||||
Collections.singleton(new NameValuePair("user", "none")));
|
||||
assertEquals(0, entities.size());
|
||||
|
||||
entities = store.getEntities("type_1", null, null, null, userFilter,
|
||||
badTestingFilters, EnumSet.allOf(Field.class)).getEntities();
|
||||
entities = getEntitiesWithFilters("type_1", null, badTestingFilters);
|
||||
assertEquals(0, entities.size());
|
||||
|
||||
entities = getEntitiesWithFilters("type_1", userFilter, badTestingFilters);
|
||||
assertEquals(0, entities.size());
|
||||
}
|
||||
|
||||
|
@ -514,6 +684,19 @@ public class TimelineStoreTestUtils {
|
|||
verifyEntityTimeline(timelines.get(0), entityId2, entityType2, ev3, ev4);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify a single entity and its start time
|
||||
*/
|
||||
protected static void verifyEntityInfo(String entityId, String entityType,
|
||||
List<TimelineEvent> events, Map<String, Set<String>> relatedEntities,
|
||||
Map<String, Set<Object>> primaryFilters, Map<String, Object> otherInfo,
|
||||
Long startTime, TimelineEntity retrievedEntityInfo) {
|
||||
|
||||
verifyEntityInfo(entityId, entityType, events, relatedEntities,
|
||||
primaryFilters, otherInfo, retrievedEntityInfo);
|
||||
assertEquals(startTime, retrievedEntityInfo.getStartTime());
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify a single entity
|
||||
*/
|
||||
|
|
|
@ -50,6 +50,7 @@ import com.sun.jersey.test.framework.WebAppDescriptor;
|
|||
public class TestTimelineWebServices extends JerseyTest {
|
||||
|
||||
private static TimelineStore store;
|
||||
private long beforeTime;
|
||||
|
||||
private Injector injector = Guice.createInjector(new ServletModule() {
|
||||
|
||||
|
@ -79,6 +80,7 @@ public class TestTimelineWebServices extends JerseyTest {
|
|||
|
||||
private TimelineStore mockTimelineStore()
|
||||
throws Exception {
|
||||
beforeTime = System.currentTimeMillis() - 1;
|
||||
TestMemoryTimelineStore store =
|
||||
new TestMemoryTimelineStore();
|
||||
store.setup();
|
||||
|
@ -141,6 +143,47 @@ public class TestTimelineWebServices extends JerseyTest {
|
|||
verifyEntities(response.getEntity(TimelineEntities.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFromId() throws Exception {
|
||||
WebResource r = resource();
|
||||
ClientResponse response = r.path("ws").path("v1").path("timeline")
|
||||
.path("type_1").queryParam("fromId", "id_2")
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||
assertEquals(1, response.getEntity(TimelineEntities.class).getEntities()
|
||||
.size());
|
||||
|
||||
response = r.path("ws").path("v1").path("timeline")
|
||||
.path("type_1").queryParam("fromId", "id_1")
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||
assertEquals(2, response.getEntity(TimelineEntities.class).getEntities()
|
||||
.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFromTs() throws Exception {
|
||||
WebResource r = resource();
|
||||
ClientResponse response = r.path("ws").path("v1").path("timeline")
|
||||
.path("type_1").queryParam("fromTs", Long.toString(beforeTime))
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||
assertEquals(0, response.getEntity(TimelineEntities.class).getEntities()
|
||||
.size());
|
||||
|
||||
response = r.path("ws").path("v1").path("timeline")
|
||||
.path("type_1").queryParam("fromTs", Long.toString(
|
||||
System.currentTimeMillis()))
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
|
||||
assertEquals(2, response.getEntity(TimelineEntities.class).getEntities()
|
||||
.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrimaryFilterString() {
|
||||
WebResource r = resource();
|
||||
|
|
Loading…
Reference in New Issue