From 5f0f16c0f6761205d3e0d2ba5389fafbc6fbadef Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Mon, 24 Mar 2014 18:16:27 +0000 Subject: [PATCH] YARN-1838. Enhanced timeline service getEntities API to get entities from a given entity ID or insertion timestamp. Contributed by Billie Rinaldi. svn merge --ignore-ancestry -c 1580960 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1580961 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../TestDistributedShell.java | 4 +- .../timeline/GenericObjectMapper.java | 10 +- .../timeline/LeveldbTimelineStore.java | 137 +++++--- .../timeline/MemoryTimelineStore.java | 44 ++- .../timeline/TimelineReader.java | 20 +- .../webapp/TimelineWebServices.java | 4 + .../timeline/TestLeveldbTimelineStore.java | 107 +++--- .../timeline/TestMemoryTimelineStore.java | 15 +- .../timeline/TimelineStoreTestUtils.java | 327 ++++++++++++++---- .../webapp/TestTimelineWebServices.java | 43 +++ 11 files changed, 538 insertions(+), 176 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b7b8e759a7a..8726df37ef7 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -163,6 +163,9 @@ Release 2.4.0 - UNRELEASED use smaps for obtaining used memory information. (Rajesh Balamohan via vinodkv) + YARN-1838. Enhanced timeline service getEntities API to get entities from a + given entity ID or insertion timestamp. (Billie Rinaldi via zjshen) + IMPROVEMENTS YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 6b84d2f4076..ccee8583e98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -192,7 +192,7 @@ public class TestDistributedShell { .getApplicationHistoryServer() .getTimelineStore() .getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(), - null, null, null, null, null, null); + null, null, null, null, null, null, null, null); Assert.assertNotNull(entitiesAttempts); Assert.assertEquals(1, entitiesAttempts.getEntities().size()); Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents() @@ -203,7 +203,7 @@ public class TestDistributedShell { .getApplicationHistoryServer() .getTimelineStore() .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null, - null, null, null, null, null); + null, null, null, null, null, null, null); Assert.assertNotNull(entities); Assert.assertEquals(2, entities.getEntities().size()); Assert.assertEquals(entities.getEntities().get(0).getEntityType() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/GenericObjectMapper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/GenericObjectMapper.java index 36448b37830..b1846a35758 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/GenericObjectMapper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/GenericObjectMapper.java @@ -102,11 +102,15 @@ public class GenericObjectMapper { */ public static byte[] writeReverseOrderedLong(long l) { byte[] b = new byte[8]; - b[0] = (byte)(0x7f ^ ((l >> 56) & 0xff)); - for (int i = 1; i < 7; i++) { + return writeReverseOrderedLong(l, b, 0); + } + + public static byte[] writeReverseOrderedLong(long l, byte[] b, int offset) { + b[offset] = (byte)(0x7f ^ ((l >> 56) & 0xff)); + for (int i = offset+1; i < offset+7; i++) { b[i] = (byte)(0xff ^ ((l >> 8*(7-i)) & 0xff)); } - b[7] = (byte)(0xff ^ (l & 0xff)); + b[offset+7] = (byte)(0xff ^ (l & 0xff)); return b; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java index e26406c04e1..edd48426976 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java @@ -135,7 +135,7 @@ public class LeveldbTimelineStore extends AbstractService private static final byte[] EMPTY_BYTES = new byte[0]; - private Map startTimeWriteCache; + private Map startTimeWriteCache; private Map startTimeReadCache; /** @@ -205,6 +205,16 @@ public class LeveldbTimelineStore extends AbstractService super.serviceStop(); } + private static class StartAndInsertTime { + final long startTime; + final long insertTime; + + public StartAndInsertTime(long startTime, long insertTime) { + this.startTime = startTime; + this.insertTime = insertTime; + } + } + private class EntityDeletionThread extends Thread { private final long ttl; private final long ttlInterval; @@ -585,14 +595,14 @@ public class LeveldbTimelineStore extends AbstractService @Override public TimelineEntities getEntities(String entityType, - Long limit, Long windowStart, Long windowEnd, + Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, NameValuePair primaryFilter, Collection secondaryFilters, EnumSet fields) throws IOException { if (primaryFilter == null) { // if no primary filter is specified, prefix the lookup with // ENTITY_ENTRY_PREFIX return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit, - windowStart, windowEnd, secondaryFilters, fields); + windowStart, windowEnd, fromId, fromTs, secondaryFilters, fields); } else { // if a primary filter is specified, prefix the lookup with // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue + @@ -602,7 +612,7 @@ public class LeveldbTimelineStore extends AbstractService .add(GenericObjectMapper.write(primaryFilter.getValue()), true) .add(ENTITY_ENTRY_PREFIX).getBytesForLookup(); return getEntityByTime(base, entityType, limit, windowStart, windowEnd, - secondaryFilters, fields); + fromId, fromTs, secondaryFilters, fields); } } @@ -614,6 +624,8 @@ public class LeveldbTimelineStore extends AbstractService * @param limit A limit on the number of entities to return * @param starttime The earliest entity start time to retrieve (exclusive) * @param endtime The latest entity start time to retrieve (inclusive) + * @param fromId Retrieve entities starting with this entity + * @param fromTs Ignore entities with insert timestamp later than this ts * @param secondaryFilters Filter pairs that the entities should match * @param fields The set of fields to retrieve * @return A list of entities @@ -621,8 +633,8 @@ public class LeveldbTimelineStore extends AbstractService */ private TimelineEntities getEntityByTime(byte[] base, String entityType, Long limit, Long starttime, Long endtime, - Collection secondaryFilters, EnumSet fields) - throws IOException { + String fromId, Long fromTs, Collection secondaryFilters, + EnumSet fields) throws IOException { DBIterator iterator = null; try { KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType); @@ -632,10 +644,25 @@ public class LeveldbTimelineStore extends AbstractService // if end time is null, place no restriction on end time endtime = Long.MAX_VALUE; } - // using end time, construct a first key that will be seeked to - byte[] revts = writeReverseOrderedLong(endtime); - kb.add(revts); - byte[] first = kb.getBytesForLookup(); + // construct a first key that will be seeked to using end time or fromId + byte[] first = null; + if (fromId != null) { + Long fromIdStartTime = getStartTimeLong(fromId, entityType); + if (fromIdStartTime == null) { + // no start time for provided id, so return empty entities + return new TimelineEntities(); + } + if (fromIdStartTime <= endtime) { + // if provided id's start time falls before the end of the window, + // use it to construct the seek key + first = kb.add(writeReverseOrderedLong(fromIdStartTime)) + .add(fromId).getBytesForLookup(); + } + } + // if seek key wasn't constructed using fromId, construct it using end ts + if (first == null) { + first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup(); + } byte[] last = null; if (starttime != null) { // if start time is not null, set a last key that will not be @@ -665,6 +692,21 @@ public class LeveldbTimelineStore extends AbstractService KeyParser kp = new KeyParser(key, prefix.length); Long startTime = kp.getNextLong(); String entityId = kp.getNextString(); + + if (fromTs != null) { + long insertTime = readReverseOrderedLong(iterator.peekNext() + .getValue(), 0); + if (insertTime > fromTs) { + byte[] firstKey = key; + while (iterator.hasNext() && prefixMatches(firstKey, + kp.getOffset(), key)) { + iterator.next(); + key = iterator.peekNext().getKey(); + } + continue; + } + } + // parse the entity that owns this key, iterating over all keys for // the entity TimelineEntity entity = getEntity(entityId, entityType, startTime, @@ -715,9 +757,10 @@ public class LeveldbTimelineStore extends AbstractService writeBatch = db.createWriteBatch(); List events = entity.getEvents(); // look up the start time for the entity - revStartTime = getAndSetStartTime(entity.getEntityId(), - entity.getEntityType(), entity.getStartTime(), events); - if (revStartTime == null) { + StartAndInsertTime startAndInsertTime = getAndSetStartTime( + entity.getEntityId(), entity.getEntityType(), + entity.getStartTime(), events); + if (startAndInsertTime == null) { // if no start time is found, add an error and return TimelinePutError error = new TimelinePutError(); error.setEntityId(entity.getEntityId()); @@ -726,11 +769,19 @@ public class LeveldbTimelineStore extends AbstractService response.addError(error); return; } + revStartTime = writeReverseOrderedLong(startAndInsertTime + .startTime); + Map> primaryFilters = entity.getPrimaryFilters(); // write entity marker - writeBatch.put(createEntityMarkerKey(entity.getEntityId(), - entity.getEntityType(), revStartTime), EMPTY_BYTES); + byte[] markerKey = createEntityMarkerKey(entity.getEntityId(), + entity.getEntityType(), revStartTime); + byte[] markerValue = writeReverseOrderedLong(startAndInsertTime + .insertTime); + writeBatch.put(markerKey, markerValue); + writePrimaryFilterEntries(writeBatch, primaryFilters, markerKey, + markerValue); // write event entries if (events != null && !events.isEmpty()) { @@ -821,17 +872,21 @@ public class LeveldbTimelineStore extends AbstractService lock = writeLocks.getLock(relatedEntity); lock.lock(); try { - byte[] relatedEntityStartTime = getAndSetStartTime( - relatedEntity.getId(), relatedEntity.getType(), + StartAndInsertTime relatedEntityStartAndInsertTime = + getAndSetStartTime(relatedEntity.getId(), relatedEntity.getType(), readReverseOrderedLong(revStartTime, 0), null); - if (relatedEntityStartTime == null) { + if (relatedEntityStartAndInsertTime == null) { throw new IOException("Error setting start time for related entity"); } + byte[] relatedEntityStartTime = writeReverseOrderedLong( + relatedEntityStartAndInsertTime.startTime); db.put(createRelatedEntityKey(relatedEntity.getId(), relatedEntity.getType(), relatedEntityStartTime, entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES); db.put(createEntityMarkerKey(relatedEntity.getId(), - relatedEntity.getType(), relatedEntityStartTime), EMPTY_BYTES); + relatedEntity.getType(), relatedEntityStartTime), + writeReverseOrderedLong(relatedEntityStartAndInsertTime + .insertTime)); } catch (IOException e) { LOG.error("Error putting related entity " + relatedEntity.getId() + " of type " + relatedEntity.getType() + " for entity " + @@ -937,19 +992,18 @@ public class LeveldbTimelineStore extends AbstractService * @param entityType The type of the entity * @param startTime The start time of the entity, or null * @param events A list of events for the entity, or null - * @return A byte array + * @return A StartAndInsertTime * @throws IOException */ - private byte[] getAndSetStartTime(String entityId, String entityType, - Long startTime, List events) + private StartAndInsertTime getAndSetStartTime(String entityId, + String entityType, Long startTime, List events) throws IOException { EntityIdentifier entity = new EntityIdentifier(entityId, entityType); if (startTime == null) { // start time is not provided, so try to look it up if (startTimeWriteCache.containsKey(entity)) { // found the start time in the cache - startTime = startTimeWriteCache.get(entity); - return writeReverseOrderedLong(startTime); + return startTimeWriteCache.get(entity); } else { if (events != null) { // prepare a start time from events in case it is needed @@ -966,14 +1020,8 @@ public class LeveldbTimelineStore extends AbstractService } else { // start time is provided if (startTimeWriteCache.containsKey(entity)) { - // check the provided start time matches the cache - if (!startTime.equals(startTimeWriteCache.get(entity))) { - // the start time is already in the cache, - // and it is different from the provided start time, - // so use the one from the cache - startTime = startTimeWriteCache.get(entity); - } - return writeReverseOrderedLong(startTime); + // always use start time from cache if it exists + return startTimeWriteCache.get(entity); } else { // check the provided start time matches the db return checkStartTimeInDb(entity, startTime); @@ -988,31 +1036,36 @@ public class LeveldbTimelineStore extends AbstractService * so it adds it back into the cache if it is found. Should only be called * when a lock has been obtained on the entity. */ - private byte[] checkStartTimeInDb(EntityIdentifier entity, + private StartAndInsertTime checkStartTimeInDb(EntityIdentifier entity, Long suggestedStartTime) throws IOException { + StartAndInsertTime startAndInsertTime = null; // create lookup key for start time byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); // retrieve value for key byte[] v = db.get(b); - byte[] revStartTime; if (v == null) { // start time doesn't exist in db if (suggestedStartTime == null) { return null; } + startAndInsertTime = new StartAndInsertTime(suggestedStartTime, + System.currentTimeMillis()); + // write suggested start time - revStartTime = writeReverseOrderedLong(suggestedStartTime); + v = new byte[16]; + writeReverseOrderedLong(suggestedStartTime, v, 0); + writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8); WriteOptions writeOptions = new WriteOptions(); writeOptions.sync(true); - db.put(b, revStartTime, writeOptions); + db.put(b, v, writeOptions); } else { // found start time in db, so ignore suggested start time - suggestedStartTime = readReverseOrderedLong(v, 0); - revStartTime = v; + startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0), + readReverseOrderedLong(v, 8)); } - startTimeWriteCache.put(entity, suggestedStartTime); - startTimeReadCache.put(entity, suggestedStartTime); - return revStartTime; + startTimeWriteCache.put(entity, startAndInsertTime); + startTimeReadCache.put(entity, startAndInsertTime.startTime); + return startAndInsertTime; } /** @@ -1245,8 +1298,6 @@ public class LeveldbTimelineStore extends AbstractService } } - // warning is suppressed to prevent eclipse from noting unclosed resource - @SuppressWarnings("resource") @VisibleForTesting boolean deleteNextEntity(String entityType, byte[] reverseTimestamp, DBIterator iterator, DBIterator pfIterator, boolean seeked) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java index 0bd666f0743..86ac1f81107 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java @@ -24,12 +24,14 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.PriorityQueue; import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -56,6 +58,8 @@ public class MemoryTimelineStore private Map entities = new HashMap(); + private Map entityInsertTimes = + new HashMap(); public MemoryTimelineStore() { super(MemoryTimelineStore.class.getName()); @@ -63,8 +67,9 @@ public class MemoryTimelineStore @Override public TimelineEntities getEntities(String entityType, Long limit, - Long windowStart, Long windowEnd, NameValuePair primaryFilter, - Collection secondaryFilters, EnumSet fields) { + Long windowStart, Long windowEnd, String fromId, Long fromTs, + NameValuePair primaryFilter, Collection secondaryFilters, + EnumSet fields) { if (limit == null) { limit = DEFAULT_LIMIT; } @@ -77,8 +82,26 @@ public class MemoryTimelineStore if (fields == null) { fields = EnumSet.allOf(Field.class); } + + Iterator entityIterator = null; + if (fromId != null) { + TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId, + entityType)); + if (firstEntity == null) { + return new TimelineEntities(); + } else { + entityIterator = new TreeSet(entities.values()) + .tailSet(firstEntity, true).iterator(); + } + } + if (entityIterator == null) { + entityIterator = new PriorityQueue(entities.values()) + .iterator(); + } + List entitiesSelected = new ArrayList(); - for (TimelineEntity entity : new PriorityQueue(entities.values())) { + while (entityIterator.hasNext()) { + TimelineEntity entity = entityIterator.next(); if (entitiesSelected.size() >= limit) { break; } @@ -91,6 +114,10 @@ public class MemoryTimelineStore if (entity.getStartTime() > windowEnd) { continue; } + if (fromTs != null && entityInsertTimes.get(new EntityIdentifier( + entity.getEntityId(), entity.getEntityType())) > fromTs) { + continue; + } if (primaryFilter != null && !matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) { continue; @@ -196,6 +223,7 @@ public class MemoryTimelineStore existingEntity.setEntityType(entity.getEntityType()); existingEntity.setStartTime(entity.getStartTime()); entities.put(entityId, existingEntity); + entityInsertTimes.put(entityId, System.currentTimeMillis()); } if (entity.getEvents() != null) { if (existingEntity.getEvents() == null) { @@ -215,9 +243,16 @@ public class MemoryTimelineStore error.setErrorCode(TimelinePutError.NO_START_TIME); response.addError(error); entities.remove(entityId); + entityInsertTimes.remove(entityId); continue; } else { - existingEntity.setStartTime(entity.getEvents().get(0).getTimestamp()); + Long min = Long.MAX_VALUE; + for (TimelineEvent e : entity.getEvents()) { + if (min > e.getTimestamp()) { + min = e.getTimestamp(); + } + } + existingEntity.setStartTime(min); } } if (entity.getPrimaryFilters() != null) { @@ -264,6 +299,7 @@ public class MemoryTimelineStore relatedEntity.addRelatedEntity(existingEntity.getEntityType(), existingEntity.getEntityId()); entities.put(relatedEntityId, relatedEntity); + entityInsertTimes.put(relatedEntityId, System.currentTimeMillis()); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java index f0be2dff1cf..9ae9954dec0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java @@ -55,8 +55,11 @@ public interface TimelineReader { final long DEFAULT_LIMIT = 100; /** - * This method retrieves a list of entity information, {@link TimelineEntity}, sorted - * by the starting timestamp for the entity, descending. + * This method retrieves a list of entity information, {@link TimelineEntity}, + * sorted by the starting timestamp for the entity, descending. The starting + * timestamp of an entity is a timestamp specified by the client. If it is not + * explicitly specified, it will be chosen by the store to be the earliest + * timestamp of the events received in the first put for the entity. * * @param entityType * The type of entities to return (required). @@ -69,6 +72,17 @@ public interface TimelineReader { * @param windowEnd * The latest start timestamp to retrieve (inclusive). If null, * defaults to {@link Long#MAX_VALUE} + * @param fromId + * If fromId is not null, retrieve entities earlier than and + * including the specified ID. If no start time is found for the + * specified ID, an empty list of entities will be returned. The + * windowEnd parameter will take precedence if the start time of this + * entity falls later than windowEnd. + * @param fromTs + * If fromTs is not null, ignore entities that were inserted into the + * store after the given timestamp. The entity's insert timestamp + * used for this comparison is the store's system time when the first + * put for the entity was received (not the entity's start time). * @param primaryFilter * Retrieves only entities that have the specified primary filter. If * null, retrieves all entities. This is an indexed retrieval, and no @@ -88,7 +102,7 @@ public interface TimelineReader { * @throws IOException */ TimelineEntities getEntities(String entityType, - Long limit, Long windowStart, Long windowEnd, + Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, NameValuePair primaryFilter, Collection secondaryFilters, EnumSet fieldsToRetrieve) throws IOException; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java index 455f22b3ce3..44567ea229a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java @@ -134,6 +134,8 @@ public class TimelineWebServices { @QueryParam("secondaryFilter") String secondaryFilter, @QueryParam("windowStart") String windowStart, @QueryParam("windowEnd") String windowEnd, + @QueryParam("fromId") String fromId, + @QueryParam("fromTs") String fromTs, @QueryParam("limit") String limit, @QueryParam("fields") String fields) { init(res); @@ -144,6 +146,8 @@ public class TimelineWebServices { parseLongStr(limit), parseLongStr(windowStart), parseLongStr(windowEnd), + parseStr(fromId), + parseLongStr(fromTs), parsePairStr(primaryFilter, ":"), parsePairsStr(secondaryFilter, ",", ":"), parseFieldsStr(fields, ",")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java index e89a9d24b20..9b273090bd0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; import java.io.File; import java.io.IOException; import java.util.Collections; -import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -35,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field; import org.iq80.leveldb.DBIterator; import org.junit.After; import org.junit.Before; @@ -46,8 +44,7 @@ import static org.junit.Assert.assertEquals; @InterfaceAudience.Private @InterfaceStability.Unstable -public class TestLeveldbTimelineStore - extends TimelineStoreTestUtils { +public class TestLeveldbTimelineStore extends TimelineStoreTestUtils { private FileContext fsContext; private File fsPath; @@ -87,6 +84,16 @@ public class TestLeveldbTimelineStore super.testGetEntities(); } + @Test + public void testGetEntitiesWithFromId() throws IOException { + super.testGetEntitiesWithFromId(); + } + + @Test + public void testGetEntitiesWithFromTs() throws IOException { + super.testGetEntitiesWithFromTs(); + } + @Test public void testGetEntitiesWithPrimaryFilters() throws IOException { super.testGetEntitiesWithPrimaryFilters(); @@ -135,55 +142,45 @@ public class TestLeveldbTimelineStore @Test public void testGetEntityTypes() throws IOException { List entityTypes = ((LeveldbTimelineStore)store).getEntityTypes(); - assertEquals(2, entityTypes.size()); + assertEquals(4, entityTypes.size()); assertEquals(entityType1, entityTypes.get(0)); assertEquals(entityType2, entityTypes.get(1)); + assertEquals(entityType4, entityTypes.get(2)); + assertEquals(entityType5, entityTypes.get(3)); } @Test public void testDeleteEntities() throws IOException, InterruptedException { - assertEquals(2, store.getEntities("type_1", null, null, null, null, null, - null).getEntities().size()); - assertEquals(1, store.getEntities("type_2", null, null, null, null, null, - null).getEntities().size()); + assertEquals(2, getEntities("type_1").size()); + assertEquals(1, getEntities("type_2").size()); assertEquals(false, deleteNextEntity(entityType1, writeReverseOrderedLong(122l))); - assertEquals(2, store.getEntities("type_1", null, null, null, null, null, - null).getEntities().size()); - assertEquals(1, store.getEntities("type_2", null, null, null, null, null, - null).getEntities().size()); + assertEquals(2, getEntities("type_1").size()); + assertEquals(1, getEntities("type_2").size()); assertEquals(true, deleteNextEntity(entityType1, writeReverseOrderedLong(123l))); - List entities = - store.getEntities("type_2", null, null, null, null, null, - EnumSet.allOf(Field.class)).getEntities(); + List entities = getEntities("type_2"); assertEquals(1, entities.size()); verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap( entityType1, Collections.singleton(entityId1b)), EMPTY_PRIMARY_FILTERS, EMPTY_MAP, entities.get(0)); - entities = store.getEntities("type_1", null, null, null, userFilter, null, - EnumSet.allOf(Field.class)).getEntities(); + entities = getEntitiesWithPrimaryFilter("type_1", userFilter); assertEquals(1, entities.size()); verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(0)); ((LeveldbTimelineStore)store).discardOldEntities(-123l); - assertEquals(1, store.getEntities("type_1", null, null, null, null, null, - null).getEntities().size()); - assertEquals(0, store.getEntities("type_2", null, null, null, null, null, - null).getEntities().size()); - assertEquals(1, ((LeveldbTimelineStore)store).getEntityTypes().size()); + assertEquals(1, getEntities("type_1").size()); + assertEquals(0, getEntities("type_2").size()); + assertEquals(3, ((LeveldbTimelineStore)store).getEntityTypes().size()); ((LeveldbTimelineStore)store).discardOldEntities(123l); - assertEquals(0, store.getEntities("type_1", null, null, null, null, null, - null).getEntities().size()); - assertEquals(0, store.getEntities("type_2", null, null, null, null, null, - null).getEntities().size()); + assertEquals(0, getEntities("type_1").size()); + assertEquals(0, getEntities("type_2").size()); assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size()); - assertEquals(0, store.getEntities("type_1", null, null, null, userFilter, - null, null).getEntities().size()); + assertEquals(0, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); } @Test @@ -200,14 +197,13 @@ public class TestLeveldbTimelineStore assertEquals(0, response.getErrors().size()); NameValuePair pfPair = new NameValuePair("user", "otheruser"); - List entities = store.getEntities("type_1", null, null, - null, pfPair, null, null).getEntities(); + List entities = getEntitiesWithPrimaryFilter("type_1", + pfPair); assertEquals(1, entities.size()); verifyEntityInfo(entityId1b, entityType1, Collections.singletonList(ev2), EMPTY_REL_ENTITIES, primaryFilter, EMPTY_MAP, entities.get(0)); - entities = store.getEntities("type_1", null, null, null, - userFilter, null, null).getEntities(); + entities = getEntitiesWithPrimaryFilter("type_1", userFilter); assertEquals(2, entities.size()); verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(0)); @@ -215,22 +211,43 @@ public class TestLeveldbTimelineStore primaryFilters, otherInfo, entities.get(1)); ((LeveldbTimelineStore)store).discardOldEntities(-123l); - assertEquals(1, store.getEntities("type_1", null, null, null, pfPair, null, - null).getEntities().size()); - assertEquals(2, store.getEntities("type_1", null, null, null, userFilter, - null, null).getEntities().size()); + assertEquals(1, getEntitiesWithPrimaryFilter("type_1", pfPair).size()); + assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); ((LeveldbTimelineStore)store).discardOldEntities(123l); - assertEquals(0, store.getEntities("type_1", null, null, null, null, null, - null).getEntities().size()); - assertEquals(0, store.getEntities("type_2", null, null, null, null, null, - null).getEntities().size()); + assertEquals(0, getEntities("type_1").size()); + assertEquals(0, getEntities("type_2").size()); assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size()); - assertEquals(0, store.getEntities("type_1", null, null, null, pfPair, null, - null).getEntities().size()); - assertEquals(0, store.getEntities("type_1", null, null, null, userFilter, - null, null).getEntities().size()); + assertEquals(0, getEntitiesWithPrimaryFilter("type_1", pfPair).size()); + assertEquals(0, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); + } + + @Test + public void testFromTsWithDeletion() + throws IOException, InterruptedException { + long l = System.currentTimeMillis(); + assertEquals(2, getEntitiesFromTs("type_1", l).size()); + assertEquals(1, getEntitiesFromTs("type_2", l).size()); + assertEquals(2, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, + l).size()); + ((LeveldbTimelineStore)store).discardOldEntities(123l); + assertEquals(0, getEntitiesFromTs("type_1", l).size()); + assertEquals(0, getEntitiesFromTs("type_2", l).size()); + assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, + l).size()); + assertEquals(0, getEntities("type_1").size()); + assertEquals(0, getEntities("type_2").size()); + assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, + l).size()); + loadTestData(); + assertEquals(0, getEntitiesFromTs("type_1", l).size()); + assertEquals(0, getEntitiesFromTs("type_2", l).size()); + assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, + l).size()); + assertEquals(2, getEntities("type_1").size()); + assertEquals(1, getEntities("type_2").size()); + assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestMemoryTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestMemoryTimelineStore.java index 49ab53f03bc..415de536793 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestMemoryTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestMemoryTimelineStore.java @@ -19,16 +19,13 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore; -import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.MemoryTimelineStore; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; -public class TestMemoryTimelineStore - extends TimelineStoreTestUtils { +public class TestMemoryTimelineStore extends TimelineStoreTestUtils { @Before public void setup() throws Exception { @@ -58,6 +55,16 @@ public class TestMemoryTimelineStore super.testGetEntities(); } + @Test + public void testGetEntitiesWithFromId() throws IOException { + super.testGetEntitiesWithFromId(); + } + + @Test + public void testGetEntitiesWithFromTs() throws IOException { + super.testGetEntitiesWithFromTs(); + } + @Test public void testGetEntitiesWithPrimaryFilters() throws IOException { super.testGetEntitiesWithPrimaryFilters(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStoreTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStoreTestUtils.java index 3b58a00178c..d76053653ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStoreTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStoreTestUtils.java @@ -41,12 +41,12 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; -import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore; -import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.NameValuePair; import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field; public class TimelineStoreTestUtils { + protected static final List EMPTY_EVENTS = + Collections.emptyList(); protected static final Map EMPTY_MAP = Collections.emptyMap(); protected static final Map> EMPTY_PRIMARY_FILTERS = @@ -60,11 +60,16 @@ public class TimelineStoreTestUtils { protected String entityId1b; protected String entityId2; protected String entityType2; + protected String entityId4; + protected String entityType4; + protected String entityId5; + protected String entityType5; protected Map> primaryFilters; protected Map secondaryFilters; protected Map allFilters; protected Map otherInfo; protected Map> relEntityMap; + protected Map> relEntityMap2; protected NameValuePair userFilter; protected NameValuePair numericFilter1; protected NameValuePair numericFilter2; @@ -78,11 +83,13 @@ public class TimelineStoreTestUtils { protected Map eventInfo; protected List events1; protected List events2; + protected long beforeTs; /** * Load test data into the given store */ protected void loadTestData() throws IOException { + beforeTs = System.currentTimeMillis()-1; TimelineEntities entities = new TimelineEntities(); Map> primaryFilters = new HashMap>(); @@ -110,6 +117,10 @@ public class TimelineStoreTestUtils { String entityId1b = "id_2"; String entityId2 = "id_2"; String entityType2 = "type_2"; + String entityId4 = "id_4"; + String entityType4 = "type_4"; + String entityId5 = "id_5"; + String entityType5 = "type_5"; Map> relatedEntities = new HashMap>(); @@ -161,6 +172,13 @@ public class TimelineStoreTestUtils { assertEquals("badentityid", error.getEntityId()); assertEquals("badentity", error.getEntityType()); assertEquals(TimelinePutError.NO_START_TIME, error.getErrorCode()); + + relatedEntities.clear(); + relatedEntities.put(entityType5, Collections.singleton(entityId5)); + entities.setEntities(Collections.singletonList(createEntity(entityId4, + entityType4, 42l, null, relatedEntities, null, null))); + response = store.put(entities); + assertEquals(0, response.getErrors().size()); } /** @@ -211,6 +229,10 @@ public class TimelineStoreTestUtils { entityId1b = "id_2"; entityId2 = "id_2"; entityType2 = "type_2"; + entityId4 = "id_4"; + entityType4 = "type_4"; + entityId5 = "id_5"; + entityType5 = "type_5"; ev1 = createEvent(123l, "start_event", null); @@ -228,6 +250,10 @@ public class TimelineStoreTestUtils { ids.add(entityId1b); relEntityMap.put(entityType1, ids); + relEntityMap2 = + new HashMap>(); + relEntityMap2.put(entityType4, Collections.singleton(entityId4)); + ev3 = createEvent(789l, "launch_event", null); ev4 = createEvent(-123l, "init_event", null); events2 = new ArrayList(); @@ -241,16 +267,24 @@ public class TimelineStoreTestUtils { store.getEntity("id_1", "type_2", EnumSet.allOf(Field.class))); verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, - primaryFilters, otherInfo, store.getEntity(entityId1, entityType1, - EnumSet.allOf(Field.class))); + primaryFilters, otherInfo, 123l, store.getEntity(entityId1, + entityType1, EnumSet.allOf(Field.class))); verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, - primaryFilters, otherInfo, store.getEntity(entityId1b, entityType1, - EnumSet.allOf(Field.class))); + primaryFilters, otherInfo, 123l, store.getEntity(entityId1b, + entityType1, EnumSet.allOf(Field.class))); verifyEntityInfo(entityId2, entityType2, events2, relEntityMap, - EMPTY_PRIMARY_FILTERS, EMPTY_MAP, store.getEntity(entityId2, entityType2, - EnumSet.allOf(Field.class))); + EMPTY_PRIMARY_FILTERS, EMPTY_MAP, -123l, store.getEntity(entityId2, + entityType2, EnumSet.allOf(Field.class))); + + verifyEntityInfo(entityId4, entityType4, EMPTY_EVENTS, EMPTY_REL_ENTITIES, + EMPTY_PRIMARY_FILTERS, EMPTY_MAP, 42l, store.getEntity(entityId4, + entityType4, EnumSet.allOf(Field.class))); + + verifyEntityInfo(entityId5, entityType5, EMPTY_EVENTS, relEntityMap2, + EMPTY_PRIMARY_FILTERS, EMPTY_MAP, 42l, store.getEntity(entityId5, + entityType5, EnumSet.allOf(Field.class))); // test getting single fields verifyEntityInfo(entityId1, entityType1, events1, null, null, null, @@ -276,70 +310,132 @@ public class TimelineStoreTestUtils { EnumSet.of(Field.RELATED_ENTITIES))); } + protected List getEntities(String entityType) + throws IOException { + return store.getEntities(entityType, null, null, null, null, null, + null, null, null).getEntities(); + } + + protected List getEntitiesWithPrimaryFilter( + String entityType, NameValuePair primaryFilter) throws IOException { + return store.getEntities(entityType, null, null, null, null, null, + primaryFilter, null, null).getEntities(); + } + + protected List getEntitiesFromId(String entityType, + String fromId) throws IOException { + return store.getEntities(entityType, null, null, null, fromId, null, + null, null, null).getEntities(); + } + + protected List getEntitiesFromTs(String entityType, + long fromTs) throws IOException { + return store.getEntities(entityType, null, null, null, null, fromTs, + null, null, null).getEntities(); + } + + protected List getEntitiesFromIdWithPrimaryFilter( + String entityType, NameValuePair primaryFilter, String fromId) + throws IOException { + return store.getEntities(entityType, null, null, null, fromId, null, + primaryFilter, null, null).getEntities(); + } + + protected List getEntitiesFromTsWithPrimaryFilter( + String entityType, NameValuePair primaryFilter, long fromTs) + throws IOException { + return store.getEntities(entityType, null, null, null, null, fromTs, + primaryFilter, null, null).getEntities(); + } + + protected List getEntitiesFromIdWithWindow(String entityType, + Long windowEnd, String fromId) throws IOException { + return store.getEntities(entityType, null, null, windowEnd, fromId, null, + null, null, null).getEntities(); + } + + protected List getEntitiesFromIdWithPrimaryFilterAndWindow( + String entityType, Long windowEnd, String fromId, + NameValuePair primaryFilter) throws IOException { + return store.getEntities(entityType, null, null, windowEnd, fromId, null, + primaryFilter, null, null).getEntities(); + } + + protected List getEntitiesWithFilters(String entityType, + NameValuePair primaryFilter, Collection secondaryFilters) + throws IOException { + return store.getEntities(entityType, null, null, null, null, null, + primaryFilter, secondaryFilters, null).getEntities(); + } + + protected List getEntities(String entityType, Long limit, + Long windowStart, Long windowEnd, NameValuePair primaryFilter, + EnumSet fields) throws IOException { + return store.getEntities(entityType, limit, windowStart, windowEnd, null, + null, primaryFilter, null, fields).getEntities(); + } + public void testGetEntities() throws IOException { // test getting entities assertEquals("nonzero entities size for nonexistent type", 0, - store.getEntities("type_0", null, null, null, null, null, - null).getEntities().size()); + getEntities("type_0").size()); assertEquals("nonzero entities size for nonexistent type", 0, - store.getEntities("type_3", null, null, null, null, null, - null).getEntities().size()); + getEntities("type_3").size()); assertEquals("nonzero entities size for nonexistent type", 0, - store.getEntities("type_0", null, null, null, userFilter, - null, null).getEntities().size()); + getEntities("type_6").size()); assertEquals("nonzero entities size for nonexistent type", 0, - store.getEntities("type_3", null, null, null, userFilter, - null, null).getEntities().size()); + getEntitiesWithPrimaryFilter("type_0", userFilter).size()); + assertEquals("nonzero entities size for nonexistent type", 0, + getEntitiesWithPrimaryFilter("type_3", userFilter).size()); + assertEquals("nonzero entities size for nonexistent type", 0, + getEntitiesWithPrimaryFilter("type_6", userFilter).size()); - List entities = - store.getEntities("type_1", null, null, null, null, null, - EnumSet.allOf(Field.class)).getEntities(); + List entities = getEntities("type_1"); assertEquals(2, entities.size()); verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(0)); verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(1)); - entities = store.getEntities("type_2", null, null, null, null, null, - EnumSet.allOf(Field.class)).getEntities(); + entities = getEntities("type_2"); assertEquals(1, entities.size()); verifyEntityInfo(entityId2, entityType2, events2, relEntityMap, EMPTY_PRIMARY_FILTERS, EMPTY_MAP, entities.get(0)); - entities = store.getEntities("type_1", 1l, null, null, null, null, - EnumSet.allOf(Field.class)).getEntities(); + entities = getEntities("type_1", 1l, null, null, null, + EnumSet.allOf(Field.class)); assertEquals(1, entities.size()); verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(0)); - entities = store.getEntities("type_1", 1l, 0l, null, null, null, - EnumSet.allOf(Field.class)).getEntities(); + entities = getEntities("type_1", 1l, 0l, null, null, + EnumSet.allOf(Field.class)); assertEquals(1, entities.size()); verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(0)); - entities = store.getEntities("type_1", null, 234l, null, null, null, - EnumSet.allOf(Field.class)).getEntities(); + entities = getEntities("type_1", null, 234l, null, null, + EnumSet.allOf(Field.class)); assertEquals(0, entities.size()); - entities = store.getEntities("type_1", null, 123l, null, null, null, - EnumSet.allOf(Field.class)).getEntities(); + entities = getEntities("type_1", null, 123l, null, null, + EnumSet.allOf(Field.class)); assertEquals(0, entities.size()); - entities = store.getEntities("type_1", null, 234l, 345l, null, null, - EnumSet.allOf(Field.class)).getEntities(); + entities = getEntities("type_1", null, 234l, 345l, null, + EnumSet.allOf(Field.class)); assertEquals(0, entities.size()); - entities = store.getEntities("type_1", null, null, 345l, null, null, - EnumSet.allOf(Field.class)).getEntities(); + entities = getEntities("type_1", null, null, 345l, null, + EnumSet.allOf(Field.class)); assertEquals(2, entities.size()); verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(0)); verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(1)); - entities = store.getEntities("type_1", null, null, 123l, null, null, - EnumSet.allOf(Field.class)).getEntities(); + entities = getEntities("type_1", null, null, 123l, null, + EnumSet.allOf(Field.class)); assertEquals(2, entities.size()); verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(0)); @@ -347,79 +443,152 @@ public class TimelineStoreTestUtils { primaryFilters, otherInfo, entities.get(1)); } + public void testGetEntitiesWithFromId() throws IOException { + List entities = getEntitiesFromId("type_1", entityId1); + assertEquals(2, entities.size()); + verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(1)); + + entities = getEntitiesFromId("type_1", entityId1b); + assertEquals(1, entities.size()); + verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + + entities = getEntitiesFromIdWithWindow("type_1", 0l, entityId1); + assertEquals(0, entities.size()); + + entities = getEntitiesFromId("type_2", "a"); + assertEquals(0, entities.size()); + + entities = getEntitiesFromId("type_2", entityId2); + assertEquals(1, entities.size()); + verifyEntityInfo(entityId2, entityType2, events2, relEntityMap, + EMPTY_PRIMARY_FILTERS, EMPTY_MAP, entities.get(0)); + + entities = getEntitiesFromIdWithWindow("type_2", -456l, null); + assertEquals(0, entities.size()); + + entities = getEntitiesFromIdWithWindow("type_2", -456l, "a"); + assertEquals(0, entities.size()); + + entities = getEntitiesFromIdWithWindow("type_2", 0l, null); + assertEquals(1, entities.size()); + + entities = getEntitiesFromIdWithWindow("type_2", 0l, entityId2); + assertEquals(1, entities.size()); + + // same tests with primary filters + entities = getEntitiesFromIdWithPrimaryFilter("type_1", userFilter, + entityId1); + assertEquals(2, entities.size()); + verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(1)); + + entities = getEntitiesFromIdWithPrimaryFilter("type_1", userFilter, + entityId1b); + assertEquals(1, entities.size()); + verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + + entities = getEntitiesFromIdWithPrimaryFilterAndWindow("type_1", 0l, + entityId1, userFilter); + assertEquals(0, entities.size()); + + entities = getEntitiesFromIdWithPrimaryFilter("type_2", userFilter, "a"); + assertEquals(0, entities.size()); + } + + public void testGetEntitiesWithFromTs() throws IOException { + assertEquals(0, getEntitiesFromTs("type_1", beforeTs).size()); + assertEquals(0, getEntitiesFromTs("type_2", beforeTs).size()); + assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, + beforeTs).size()); + long afterTs = System.currentTimeMillis(); + assertEquals(2, getEntitiesFromTs("type_1", afterTs).size()); + assertEquals(1, getEntitiesFromTs("type_2", afterTs).size()); + assertEquals(2, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, + afterTs).size()); + assertEquals(2, getEntities("type_1").size()); + assertEquals(1, getEntities("type_2").size()); + assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); + // check insert time is not overwritten + long beforeTs = this.beforeTs; + loadTestData(); + assertEquals(0, getEntitiesFromTs("type_1", beforeTs).size()); + assertEquals(0, getEntitiesFromTs("type_2", beforeTs).size()); + assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, + beforeTs).size()); + assertEquals(2, getEntitiesFromTs("type_1", afterTs).size()); + assertEquals(1, getEntitiesFromTs("type_2", afterTs).size()); + assertEquals(2, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, + afterTs).size()); + } + public void testGetEntitiesWithPrimaryFilters() throws IOException { // test using primary filter assertEquals("nonzero entities size for primary filter", 0, - store.getEntities("type_1", null, null, null, - new NameValuePair("none", "none"), null, - EnumSet.allOf(Field.class)).getEntities().size()); + getEntitiesWithPrimaryFilter("type_1", + new NameValuePair("none", "none")).size()); assertEquals("nonzero entities size for primary filter", 0, - store.getEntities("type_2", null, null, null, - new NameValuePair("none", "none"), null, - EnumSet.allOf(Field.class)).getEntities().size()); + getEntitiesWithPrimaryFilter("type_2", + new NameValuePair("none", "none")).size()); assertEquals("nonzero entities size for primary filter", 0, - store.getEntities("type_3", null, null, null, - new NameValuePair("none", "none"), null, - EnumSet.allOf(Field.class)).getEntities().size()); + getEntitiesWithPrimaryFilter("type_3", + new NameValuePair("none", "none")).size()); - List entities = store.getEntities("type_1", null, null, null, - userFilter, null, EnumSet.allOf(Field.class)).getEntities(); + List entities = getEntitiesWithPrimaryFilter("type_1", + userFilter); assertEquals(2, entities.size()); verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(0)); verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(1)); - store.getEntities("type_1", null, null, null, - numericFilter1, null, EnumSet.allOf(Field.class)).getEntities(); + entities = getEntitiesWithPrimaryFilter("type_1", numericFilter1); assertEquals(2, entities.size()); verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(0)); verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(1)); - store.getEntities("type_1", null, null, null, - numericFilter2, null, EnumSet.allOf(Field.class)).getEntities(); + entities = getEntitiesWithPrimaryFilter("type_1", numericFilter2); assertEquals(2, entities.size()); verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(0)); verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(1)); - store.getEntities("type_1", null, null, null, - numericFilter3, null, EnumSet.allOf(Field.class)).getEntities(); + entities = getEntitiesWithPrimaryFilter("type_1", numericFilter3); assertEquals(2, entities.size()); verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(0)); verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(1)); - entities = store.getEntities("type_2", null, null, null, userFilter, null, - EnumSet.allOf(Field.class)).getEntities(); + entities = getEntitiesWithPrimaryFilter("type_2", userFilter); assertEquals(0, entities.size()); - entities = store.getEntities("type_1", 1l, null, null, userFilter, null, - EnumSet.allOf(Field.class)).getEntities(); + entities = getEntities("type_1", 1l, null, null, userFilter, null); assertEquals(1, entities.size()); verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(0)); - entities = store.getEntities("type_1", 1l, 0l, null, userFilter, null, - EnumSet.allOf(Field.class)).getEntities(); + entities = getEntities("type_1", 1l, 0l, null, userFilter, null); assertEquals(1, entities.size()); verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(0)); - entities = store.getEntities("type_1", null, 234l, null, userFilter, null, - EnumSet.allOf(Field.class)).getEntities(); + entities = getEntities("type_1", null, 234l, null, userFilter, null); assertEquals(0, entities.size()); - entities = store.getEntities("type_1", null, 234l, 345l, userFilter, null, - EnumSet.allOf(Field.class)).getEntities(); + entities = getEntities("type_1", null, 234l, 345l, userFilter, null); assertEquals(0, entities.size()); - entities = store.getEntities("type_1", null, null, 345l, userFilter, null, - EnumSet.allOf(Field.class)).getEntities(); + entities = getEntities("type_1", null, null, 345l, userFilter, null); assertEquals(2, entities.size()); verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(0)); @@ -429,28 +598,29 @@ public class TimelineStoreTestUtils { public void testGetEntitiesWithSecondaryFilters() throws IOException { // test using secondary filter - List entities = store.getEntities("type_1", null, null, null, - null, goodTestingFilters, EnumSet.allOf(Field.class)).getEntities(); + List entities = getEntitiesWithFilters("type_1", null, + goodTestingFilters); assertEquals(2, entities.size()); verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(0)); verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(1)); - entities = store.getEntities("type_1", null, null, null, userFilter, - goodTestingFilters, EnumSet.allOf(Field.class)).getEntities(); + entities = getEntitiesWithFilters("type_1", userFilter, goodTestingFilters); assertEquals(2, entities.size()); verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(0)); verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(1)); - entities = store.getEntities("type_1", null, null, null, null, - badTestingFilters, EnumSet.allOf(Field.class)).getEntities(); + entities = getEntitiesWithFilters("type_1", null, + Collections.singleton(new NameValuePair("user", "none"))); assertEquals(0, entities.size()); - entities = store.getEntities("type_1", null, null, null, userFilter, - badTestingFilters, EnumSet.allOf(Field.class)).getEntities(); + entities = getEntitiesWithFilters("type_1", null, badTestingFilters); + assertEquals(0, entities.size()); + + entities = getEntitiesWithFilters("type_1", userFilter, badTestingFilters); assertEquals(0, entities.size()); } @@ -514,6 +684,19 @@ public class TimelineStoreTestUtils { verifyEntityTimeline(timelines.get(0), entityId2, entityType2, ev3, ev4); } + /** + * Verify a single entity and its start time + */ + protected static void verifyEntityInfo(String entityId, String entityType, + List events, Map> relatedEntities, + Map> primaryFilters, Map otherInfo, + Long startTime, TimelineEntity retrievedEntityInfo) { + + verifyEntityInfo(entityId, entityType, events, relatedEntities, + primaryFilters, otherInfo, retrievedEntityInfo); + assertEquals(startTime, retrievedEntityInfo.getStartTime()); + } + /** * Verify a single entity */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestTimelineWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestTimelineWebServices.java index ee83e4083e9..650184bd517 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestTimelineWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestTimelineWebServices.java @@ -50,6 +50,7 @@ import com.sun.jersey.test.framework.WebAppDescriptor; public class TestTimelineWebServices extends JerseyTest { private static TimelineStore store; + private long beforeTime; private Injector injector = Guice.createInjector(new ServletModule() { @@ -79,6 +80,7 @@ public class TestTimelineWebServices extends JerseyTest { private TimelineStore mockTimelineStore() throws Exception { + beforeTime = System.currentTimeMillis() - 1; TestMemoryTimelineStore store = new TestMemoryTimelineStore(); store.setup(); @@ -141,6 +143,47 @@ public class TestTimelineWebServices extends JerseyTest { verifyEntities(response.getEntity(TimelineEntities.class)); } + @Test + public void testFromId() throws Exception { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1").path("timeline") + .path("type_1").queryParam("fromId", "id_2") + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + assertEquals(1, response.getEntity(TimelineEntities.class).getEntities() + .size()); + + response = r.path("ws").path("v1").path("timeline") + .path("type_1").queryParam("fromId", "id_1") + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + assertEquals(2, response.getEntity(TimelineEntities.class).getEntities() + .size()); + } + + @Test + public void testFromTs() throws Exception { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1").path("timeline") + .path("type_1").queryParam("fromTs", Long.toString(beforeTime)) + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + assertEquals(0, response.getEntity(TimelineEntities.class).getEntities() + .size()); + + response = r.path("ws").path("v1").path("timeline") + .path("type_1").queryParam("fromTs", Long.toString( + System.currentTimeMillis())) + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + assertEquals(2, response.getEntity(TimelineEntities.class).getEntities() + .size()); + } + @Test public void testPrimaryFilterString() { WebResource r = resource();