YARN-1635. Implemented a Leveldb based ApplicationTimelineStore. Contributed by Billie Rinaldi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1565868 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhijie Shen 2014-02-08 02:15:46 +00:00
parent d57c6e0fe7
commit 23b2e43f5d
19 changed files with 1431 additions and 41 deletions

View File

@ -784,6 +784,12 @@
<artifactId>grizzly-http-servlet</artifactId> <artifactId>grizzly-http-servlet</artifactId>
<version>2.1.2</version> <version>2.1.2</version>
</dependency> </dependency>
<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
<version>1.8</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>

View File

@ -116,6 +116,9 @@ Release 2.4.0 - UNRELEASED
YARN-1566. Changed Distributed Shell to retain containers across application YARN-1566. Changed Distributed Shell to retain containers across application
attempts. (Jian He via vinodkv) attempts. (Jian He via vinodkv)
YARN-1635. Implemented a Leveldb based ApplicationTimelineStore. (Billie
Rinaldi via zjshen)
IMPROVEMENTS IMPROVEMENTS
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via

View File

@ -94,9 +94,21 @@ public class ATSPutErrors {
@Public @Public
@Unstable @Unstable
public static class ATSPutError { public static class ATSPutError {
/**
* Error code returned when no start time can be found when putting an
* entity. This occurs when the entity does not already exist in the
* store and it is put with no start time or events specified.
*/
public static final int NO_START_TIME = 1;
/**
* Error code returned if an IOException is encountered when putting an
* entity.
*/
public static final int IO_EXCEPTION = 2;
private String entityId; private String entityId;
private String entityType; private String entityType;
private Integer errorCode; private int errorCode;
/** /**
* Get the entity Id * Get the entity Id
@ -144,7 +156,7 @@ public class ATSPutErrors {
* @return an error code * @return an error code
*/ */
@XmlElement(name = "errorcode") @XmlElement(name = "errorcode")
public Integer getErrorCode() { public int getErrorCode() {
return errorCode; return errorCode;
} }
@ -154,7 +166,7 @@ public class ATSPutErrors {
* @param errorCode * @param errorCode
* an error code * an error code
*/ */
public void setErrorCode(Integer errorCode) { public void setErrorCode(int errorCode) {
this.errorCode = errorCode; this.errorCode = errorCode;
} }

View File

@ -1041,6 +1041,10 @@ public class YarnConfiguration extends Configuration {
/** ATS store class */ /** ATS store class */
public static final String ATS_STORE = ATS_PREFIX + "store.class"; public static final String ATS_STORE = ATS_PREFIX + "store.class";
/** ATS leveldb path */
public static final String ATS_LEVELDB_PATH_PROPERTY =
ATS_PREFIX + "leveldb-apptimeline-store.path";
//////////////////////////////// ////////////////////////////////
// Other Configs // Other Configs
//////////////////////////////// ////////////////////////////////

View File

@ -1145,7 +1145,13 @@
<property> <property>
<description>Store class name for application timeline store</description> <description>Store class name for application timeline store</description>
<name>yarn.ats.store.class</name> <name>yarn.ats.store.class</name>
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.MemoryApplicationTimelineStore</value> <value>org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.LeveldbApplicationTimelineStore</value>
</property>
<property>
<description>Store file name for leveldb application timeline store</description>
<name>yarn.ats.leveldb-apptimeline-store.path</name>
<value>${yarn.log.dir}/ats</value>
</property> </property>
<!-- Other configuration --> <!-- Other configuration -->

View File

@ -117,14 +117,14 @@ public class TestApplicationTimelineRecords {
ATSPutError error1 = new ATSPutError(); ATSPutError error1 = new ATSPutError();
error1.setEntityId("entity id 1"); error1.setEntityId("entity id 1");
error1.setEntityId("entity type 1"); error1.setEntityId("entity type 1");
error1.setErrorCode(1); error1.setErrorCode(ATSPutError.NO_START_TIME);
atsPutErrors.addError(error1); atsPutErrors.addError(error1);
List<ATSPutError> errors = new ArrayList<ATSPutError>(); List<ATSPutError> errors = new ArrayList<ATSPutError>();
errors.add(error1); errors.add(error1);
ATSPutError error2 = new ATSPutError(); ATSPutError error2 = new ATSPutError();
error2.setEntityId("entity id 2"); error2.setEntityId("entity id 2");
error2.setEntityId("entity type 2"); error2.setEntityId("entity type 2");
error2.setErrorCode(2); error2.setErrorCode(ATSPutError.IO_EXCEPTION);
errors.add(error2); errors.add(error2);
atsPutErrors.addErrors(errors); atsPutErrors.addErrors(errors);

View File

@ -168,5 +168,24 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline; package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Set; import java.util.Set;
@ -78,13 +79,15 @@ public interface ApplicationTimelineReader {
* retrieve (see {@link Field}). If the set of fields * retrieve (see {@link Field}). If the set of fields
* contains {@link Field#LAST_EVENT_ONLY} and not * contains {@link Field#LAST_EVENT_ONLY} and not
* {@link Field#EVENTS}, the most recent event for * {@link Field#EVENTS}, the most recent event for
* each entity is retrieved. * each entity is retrieved. If null, retrieves all
* fields.
* @return An {@link ATSEntities} object. * @return An {@link ATSEntities} object.
* @throws IOException
*/ */
ATSEntities getEntities(String entityType, ATSEntities getEntities(String entityType,
Long limit, Long windowStart, Long windowEnd, Long limit, Long windowStart, Long windowEnd,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters, NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fieldsToRetrieve); EnumSet<Field> fieldsToRetrieve) throws IOException;
/** /**
* This method retrieves the entity information for a given entity. * This method retrieves the entity information for a given entity.
@ -95,11 +98,13 @@ public interface ApplicationTimelineReader {
* retrieve (see {@link Field}). If the set of * retrieve (see {@link Field}). If the set of
* fields contains {@link Field#LAST_EVENT_ONLY} and * fields contains {@link Field#LAST_EVENT_ONLY} and
* not {@link Field#EVENTS}, the most recent event * not {@link Field#EVENTS}, the most recent event
* for each entity is retrieved. * for each entity is retrieved. If null, retrieves
* all fields.
* @return An {@link ATSEntity} object. * @return An {@link ATSEntity} object.
* @throws IOException
*/ */
ATSEntity getEntity(String entity, String entityType, EnumSet<Field> ATSEntity getEntity(String entity, String entityType, EnumSet<Field>
fieldsToRetrieve); fieldsToRetrieve) throws IOException;
/** /**
* This method retrieves the events for a list of entities all of the same * This method retrieves the events for a list of entities all of the same
@ -118,8 +123,9 @@ public interface ApplicationTimelineReader {
* @param eventTypes Restricts the events returned to the given types. If * @param eventTypes Restricts the events returned to the given types. If
* null, events of all types will be returned. * null, events of all types will be returned.
* @return An {@link ATSEvents} object. * @return An {@link ATSEvents} object.
* @throws IOException
*/ */
ATSEvents getEntityTimelines(String entityType, ATSEvents getEntityTimelines(String entityType,
SortedSet<String> entityIds, Long limit, Long windowStart, SortedSet<String> entityIds, Long limit, Long windowStart,
Long windowEnd, Set<String> eventTypes); Long windowEnd, Set<String> eventTypes) throws IOException;
} }

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities; import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors; import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import java.io.IOException;
/** /**
* This interface is for storing application timeline information. * This interface is for storing application timeline information.
*/ */
@ -37,7 +39,8 @@ public interface ApplicationTimelineWriter {
* *
* @param data An {@link ATSEntities} object. * @param data An {@link ATSEntities} object.
* @return An {@link ATSPutErrors} object. * @return An {@link ATSPutErrors} object.
* @throws IOException
*/ */
ATSPutErrors put(ATSEntities data); ATSPutErrors put(ATSEntities data) throws IOException;
} }

View File

@ -26,12 +26,12 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
*/ */
@Private @Private
@Unstable @Unstable
public class EntityId implements Comparable<EntityId> { public class EntityIdentifier implements Comparable<EntityIdentifier> {
private String id; private String id;
private String type; private String type;
public EntityId(String id, String type) { public EntityIdentifier(String id, String type) {
this.id = id; this.id = id;
this.type = type; this.type = type;
} }
@ -53,7 +53,7 @@ public class EntityId implements Comparable<EntityId> {
} }
@Override @Override
public int compareTo(EntityId other) { public int compareTo(EntityIdentifier other) {
int c = type.compareTo(other.type); int c = type.compareTo(other.type);
if (c != 0) return c; if (c != 0) return c;
return id.compareTo(other.id); return id.compareTo(other.id);
@ -78,7 +78,7 @@ public class EntityId implements Comparable<EntityId> {
return false; return false;
if (getClass() != obj.getClass()) if (getClass() != obj.getClass())
return false; return false;
EntityId other = (EntityId) obj; EntityIdentifier other = (EntityIdentifier) obj;
if (id == null) { if (id == null) {
if (other.id != null) if (other.id != null)
return false; return false;

View File

@ -0,0 +1,208 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.WritableUtils;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
/**
* A utility class providing methods for serializing and deserializing
* objects. The {@link #write(Object)}, {@link #read(byte[])} and {@link
* #write(java.io.DataOutputStream, Object)}, {@link
* #read(java.io.DataInputStream)} methods are used by the
* {@link LeveldbApplicationTimelineStore} to store and retrieve arbitrary
* JSON, while the {@link #writeReverseOrderedLong} and {@link
* #readReverseOrderedLong} methods are used to sort entities in descending
* start time order.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class GenericObjectMapper {
private static final byte[] EMPTY_BYTES = new byte[0];
private static final byte LONG = 0x1;
private static final byte INTEGER = 0x2;
private static final byte DOUBLE = 0x3;
private static final byte STRING = 0x4;
private static final byte BOOLEAN = 0x5;
private static final byte LIST = 0x6;
private static final byte MAP = 0x7;
/**
* Serializes an Object into a byte array. Along with {@link #read(byte[]) },
* can be used to serialize an Object and deserialize it into an Object of
* the same type without needing to specify the Object's type,
* as long as it is one of the JSON-compatible objects Long, Integer,
* Double, String, Boolean, List, or Map. The current implementation uses
* ObjectMapper to serialize complex objects (List and Map) while using
* Writable to serialize simpler objects, to produce fewer bytes.
*
* @param o An Object
* @return A byte array representation of the Object
* @throws IOException
*/
public static byte[] write(Object o) throws IOException {
if (o == null)
return EMPTY_BYTES;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
write(new DataOutputStream(baos), o);
return baos.toByteArray();
}
/**
* Serializes an Object and writes it to a DataOutputStream. Along with
* {@link #read(java.io.DataInputStream)}, can be used to serialize an Object
* and deserialize it into an Object of the same type without needing to
* specify the Object's type, as long as it is one of the JSON-compatible
* objects Long, Integer, Double, String, Boolean, List, or Map. The current
* implementation uses ObjectMapper to serialize complex objects (List and
* Map) while using Writable to serialize simpler objects, to produce fewer
* bytes.
*
* @param dos A DataOutputStream
* @param o An Object
* @throws IOException
*/
public static void write(DataOutputStream dos, Object o)
throws IOException {
if (o == null)
return;
if (o instanceof Long) {
dos.write(LONG);
WritableUtils.writeVLong(dos, (Long) o);
} else if(o instanceof Integer) {
dos.write(INTEGER);
WritableUtils.writeVInt(dos, (Integer) o);
} else if(o instanceof Double) {
dos.write(DOUBLE);
dos.writeDouble((Double) o);
} else if (o instanceof String) {
dos.write(STRING);
WritableUtils.writeString(dos, (String) o);
} else if (o instanceof Boolean) {
dos.write(BOOLEAN);
dos.writeBoolean((Boolean) o);
} else if (o instanceof List) {
dos.write(LIST);
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(dos, o);
} else if (o instanceof Map) {
dos.write(MAP);
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(dos, o);
} else {
throw new IOException("Couldn't serialize object");
}
}
/**
* Deserializes an Object from a byte array created with
* {@link #write(Object)}.
*
* @param b A byte array
* @return An Object
* @throws IOException
*/
public static Object read(byte[] b) throws IOException {
if (b == null || b.length == 0)
return null;
ByteArrayInputStream bais = new ByteArrayInputStream(b);
return read(new DataInputStream(bais));
}
/**
* Reads an Object from a DataInputStream whose data has been written with
* {@link #write(java.io.DataOutputStream, Object)}.
*
* @param dis A DataInputStream
* @return An Object, null if an unrecognized type
* @throws IOException
*/
public static Object read(DataInputStream dis) throws IOException {
byte code = (byte)dis.read();
ObjectMapper mapper;
switch (code) {
case LONG:
return WritableUtils.readVLong(dis);
case INTEGER:
return WritableUtils.readVInt(dis);
case DOUBLE:
return dis.readDouble();
case STRING:
return WritableUtils.readString(dis);
case BOOLEAN:
return dis.readBoolean();
case LIST:
mapper = new ObjectMapper();
return mapper.readValue(dis, ArrayList.class);
case MAP:
mapper = new ObjectMapper();
return mapper.readValue(dis, HashMap.class);
default:
return null;
}
}
/**
* Converts a long to a 8-byte array so that lexicographic ordering of the
* produced byte arrays sort the longs in descending order.
*
* @param l A long
* @return A byte array
*/
public static byte[] writeReverseOrderedLong(long l) {
byte[] b = new byte[8];
b[0] = (byte)(0x7f ^ ((l >> 56) & 0xff));
for (int i = 1; i < 7; i++)
b[i] = (byte)(0xff ^ ((l >> 8*(7-i)) & 0xff));
b[7] = (byte)(0xff ^ (l & 0xff));
return b;
}
/**
* Reads 8 bytes from an array starting at the specified offset and
* converts them to a long. The bytes are assumed to have been created
* with {@link #writeReverseOrderedLong}.
*
* @param b A byte array
* @param offset An offset into the byte array
* @return A long
*/
public static long readReverseOrderedLong(byte[] b, int offset) {
long l = b[offset] & 0xff;
for (int i = 1; i < 8; i++) {
l = l << 8;
l = l | (b[offset+i]&0xff);
}
return l ^ 0x7fffffffffffffffl;
}
}

View File

@ -0,0 +1,854 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections.map.LRUMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents.ATSEventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
import static org.apache.hadoop.yarn.server.applicationhistoryservice
.apptimeline.GenericObjectMapper.readReverseOrderedLong;
import static org.apache.hadoop.yarn.server.applicationhistoryservice
.apptimeline.GenericObjectMapper.writeReverseOrderedLong;
/**
* An implementation of an application timeline store backed by leveldb.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class LeveldbApplicationTimelineStore extends AbstractService
implements ApplicationTimelineStore {
private static final Log LOG = LogFactory
.getLog(LeveldbApplicationTimelineStore.class);
private static final String FILENAME = "leveldb-apptimeline-store.ldb";
private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes();
private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes();
private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes();
private static final byte[] PRIMARY_FILTER_COLUMN = "f".getBytes();
private static final byte[] OTHER_INFO_COLUMN = "i".getBytes();
private static final byte[] RELATED_COLUMN = "r".getBytes();
private static final byte[] TIME_COLUMN = "t".getBytes();
private static final byte[] EMPTY_BYTES = new byte[0];
private static final int START_TIME_CACHE_SIZE = 10000;
@SuppressWarnings("unchecked")
private final Map<EntityIdentifier, Long> startTimeCache =
Collections.synchronizedMap(new LRUMap(START_TIME_CACHE_SIZE));
private DB db;
public LeveldbApplicationTimelineStore() {
super(LeveldbApplicationTimelineStore.class.getName());
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
Options options = new Options();
options.createIfMissing(true);
JniDBFactory factory = new JniDBFactory();
String path = conf.get(YarnConfiguration.ATS_LEVELDB_PATH_PROPERTY);
File p = new File(path);
if (!p.exists())
if (!p.mkdirs())
throw new IOException("Couldn't create directory for leveldb " +
"application timeline store " + path);
LOG.info("Using leveldb path " + path);
db = factory.open(new File(path, FILENAME), options);
super.serviceInit(conf);
}
@Override
protected void serviceStop() throws Exception {
IOUtils.cleanup(LOG, db);
super.serviceStop();
}
private static class KeyBuilder {
private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
private byte[][] b;
private boolean[] useSeparator;
private int index;
private int length;
public KeyBuilder(int size) {
b = new byte[size][];
useSeparator = new boolean[size];
index = 0;
length = 0;
}
public static KeyBuilder newInstance() {
return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS);
}
public KeyBuilder add(String s) {
return add(s.getBytes(), true);
}
public KeyBuilder add(byte[] t) {
return add(t, false);
}
public KeyBuilder add(byte[] t, boolean sep) {
b[index] = t;
useSeparator[index] = sep;
length += t.length;
if (sep)
length++;
index++;
return this;
}
public byte[] getBytes() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
for (int i = 0; i < index; i++) {
baos.write(b[i]);
if (i < index-1 && useSeparator[i])
baos.write(0x0);
}
return baos.toByteArray();
}
public byte[] getBytesForLookup() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
for (int i = 0; i < index; i++) {
baos.write(b[i]);
if (useSeparator[i])
baos.write(0x0);
}
return baos.toByteArray();
}
}
private static class KeyParser {
private final byte[] b;
private int offset;
public KeyParser(byte[] b, int offset) {
this.b = b;
this.offset = offset;
}
public String getNextString() throws IOException {
if (offset >= b.length)
throw new IOException(
"tried to read nonexistent string from byte array");
int i = 0;
while (offset+i < b.length && b[offset+i] != 0x0)
i++;
String s = new String(b, offset, i);
offset = offset + i + 1;
return s;
}
public long getNextLong() throws IOException {
if (offset+8 >= b.length)
throw new IOException("byte array ran out when trying to read long");
long l = readReverseOrderedLong(b, offset);
offset += 8;
return l;
}
public int getOffset() {
return offset;
}
}
@Override
public ATSEntity getEntity(String entity, String entityType,
EnumSet<Field> fields) throws IOException {
DBIterator iterator = null;
try {
byte[] revStartTime = getStartTime(entity, entityType, null, null, null);
if (revStartTime == null)
return null;
byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
.add(entityType).add(revStartTime).add(entity).getBytesForLookup();
iterator = db.iterator();
iterator.seek(prefix);
return getEntity(entity, entityType,
readReverseOrderedLong(revStartTime, 0), fields, iterator, prefix,
prefix.length);
} finally {
IOUtils.cleanup(LOG, iterator);
}
}
/**
* Read entity from a db iterator. If no information is found in the
* specified fields for this entity, return null.
*/
private static ATSEntity getEntity(String entity, String entityType,
Long startTime, EnumSet<Field> fields, DBIterator iterator,
byte[] prefix, int prefixlen) throws IOException {
if (fields == null)
fields = EnumSet.allOf(Field.class);
ATSEntity atsEntity = new ATSEntity();
boolean events = false;
boolean lastEvent = false;
if (fields.contains(Field.EVENTS)) {
events = true;
atsEntity.setEvents(new ArrayList<ATSEvent>());
} else if (fields.contains(Field.LAST_EVENT_ONLY)) {
lastEvent = true;
atsEntity.setEvents(new ArrayList<ATSEvent>());
}
else {
atsEntity.setEvents(null);
}
boolean relatedEntities = false;
if (fields.contains(Field.RELATED_ENTITIES)) {
relatedEntities = true;
atsEntity.setRelatedEntities(new HashMap<String, List<String>>());
} else {
atsEntity.setRelatedEntities(null);
}
boolean primaryFilters = false;
if (fields.contains(Field.PRIMARY_FILTERS)) {
primaryFilters = true;
atsEntity.setPrimaryFilters(new HashMap<String, Object>());
} else {
atsEntity.setPrimaryFilters(null);
}
boolean otherInfo = false;
if (fields.contains(Field.OTHER_INFO)) {
otherInfo = true;
atsEntity.setOtherInfo(new HashMap<String, Object>());
} else {
atsEntity.setOtherInfo(null);
}
// iterate through the entity's entry, parsing information if it is part
// of a requested field
for (; iterator.hasNext(); iterator.next()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefixlen, key))
break;
if (key[prefixlen] == PRIMARY_FILTER_COLUMN[0]) {
if (primaryFilters) {
atsEntity.addPrimaryFilter(parseRemainingKey(key,
prefixlen + PRIMARY_FILTER_COLUMN.length),
GenericObjectMapper.read(iterator.peekNext().getValue()));
}
} else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
if (otherInfo) {
atsEntity.addOtherInfo(parseRemainingKey(key,
prefixlen + OTHER_INFO_COLUMN.length),
GenericObjectMapper.read(iterator.peekNext().getValue()));
}
} else if (key[prefixlen] == RELATED_COLUMN[0]) {
if (relatedEntities) {
addRelatedEntity(atsEntity, key,
prefixlen + RELATED_COLUMN.length);
}
} else if (key[prefixlen] == TIME_COLUMN[0]) {
if (events || (lastEvent && atsEntity.getEvents().size() == 0)) {
ATSEvent event = getEntityEvent(null, key, prefixlen +
TIME_COLUMN.length, iterator.peekNext().getValue());
if (event != null) {
atsEntity.addEvent(event);
}
}
} else {
LOG.warn(String.format("Found unexpected column for entity %s of " +
"type %s (0x%02x)", entity, entityType, key[prefixlen]));
}
}
atsEntity.setEntityId(entity);
atsEntity.setEntityType(entityType);
atsEntity.setStartTime(startTime);
return atsEntity;
}
@Override
public ATSEvents getEntityTimelines(String entityType,
SortedSet<String> entityIds, Long limit, Long windowStart,
Long windowEnd, Set<String> eventType) throws IOException {
ATSEvents atsEvents = new ATSEvents();
if (entityIds == null || entityIds.isEmpty())
return atsEvents;
// create a lexicographically-ordered map from start time to entities
Map<byte[], List<EntityIdentifier>> startTimeMap = new TreeMap<byte[],
List<EntityIdentifier>>(new Comparator<byte[]>() {
@Override
public int compare(byte[] o1, byte[] o2) {
return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0,
o2.length);
}
});
DBIterator iterator = null;
try {
// look up start times for the specified entities
// skip entities with no start time
for (String entity : entityIds) {
byte[] startTime = getStartTime(entity, entityType, null, null, null);
if (startTime != null) {
List<EntityIdentifier> entities = startTimeMap.get(startTime);
if (entities == null) {
entities = new ArrayList<EntityIdentifier>();
startTimeMap.put(startTime, entities);
}
entities.add(new EntityIdentifier(entity, entityType));
}
}
for (Entry<byte[], List<EntityIdentifier>> entry :
startTimeMap.entrySet()) {
// look up the events matching the given parameters (limit,
// start time, end time, event types) for entities whose start times
// were found and add the entities to the return list
byte[] revStartTime = entry.getKey();
for (EntityIdentifier entity : entry.getValue()) {
ATSEventsOfOneEntity atsEntity = new ATSEventsOfOneEntity();
atsEntity.setEntityId(entity.getId());
atsEntity.setEntityType(entityType);
atsEvents.addEvent(atsEntity);
KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
.add(entityType).add(revStartTime).add(entity.getId())
.add(TIME_COLUMN);
byte[] prefix = kb.getBytesForLookup();
if (windowEnd == null) {
windowEnd = Long.MAX_VALUE;
}
byte[] revts = writeReverseOrderedLong(windowEnd);
kb.add(revts);
byte[] first = kb.getBytesForLookup();
byte[] last = null;
if (windowStart != null) {
last = KeyBuilder.newInstance().add(prefix)
.add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
}
if (limit == null) {
limit = DEFAULT_LIMIT;
}
iterator = db.iterator();
for (iterator.seek(first); atsEntity.getEvents().size() < limit &&
iterator.hasNext(); iterator.next()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
WritableComparator.compareBytes(key, 0, key.length, last, 0,
last.length) > 0))
break;
ATSEvent event = getEntityEvent(eventType, key, prefix.length,
iterator.peekNext().getValue());
if (event != null)
atsEntity.addEvent(event);
}
}
}
} finally {
IOUtils.cleanup(LOG, iterator);
}
return atsEvents;
}
/**
* Returns true if the byte array begins with the specified prefix.
*/
private static boolean prefixMatches(byte[] prefix, int prefixlen,
byte[] b) {
if (b.length < prefixlen)
return false;
return WritableComparator.compareBytes(prefix, 0, prefixlen, b, 0,
prefixlen) == 0;
}
@Override
public ATSEntities getEntities(String entityType,
Long limit, Long windowStart, Long windowEnd,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fields) throws IOException {
if (primaryFilter == null) {
// if no primary filter is specified, prefix the lookup with
// ENTITY_ENTRY_PREFIX
return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit,
windowStart, windowEnd, secondaryFilters, fields);
} else {
// if a primary filter is specified, prefix the lookup with
// INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
// ENTITY_ENTRY_PREFIX
byte[] base = KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
.add(primaryFilter.getName())
.add(GenericObjectMapper.write(primaryFilter.getValue()), true)
.add(ENTITY_ENTRY_PREFIX).getBytesForLookup();
return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
secondaryFilters, fields);
}
}
/**
* Retrieves a list of entities satisfying given parameters.
*
* @param base A byte array prefix for the lookup
* @param entityType The type of the entity
* @param limit A limit on the number of entities to return
* @param starttime The earliest entity start time to retrieve (exclusive)
* @param endtime The latest entity start time to retrieve (inclusive)
* @param secondaryFilters Filter pairs that the entities should match
* @param fields The set of fields to retrieve
* @return A list of entities
* @throws IOException
*/
private ATSEntities getEntityByTime(byte[] base,
String entityType, Long limit, Long starttime, Long endtime,
Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields)
throws IOException {
DBIterator iterator = null;
try {
KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
// only db keys matching the prefix (base + entity type) will be parsed
byte[] prefix = kb.getBytesForLookup();
if (endtime == null) {
// if end time is null, place no restriction on end time
endtime = Long.MAX_VALUE;
}
// using end time, construct a first key that will be seeked to
byte[] revts = writeReverseOrderedLong(endtime);
kb.add(revts);
byte[] first = kb.getBytesForLookup();
byte[] last = null;
if (starttime != null) {
// if start time is not null, set a last key that will not be
// iterated past
last = KeyBuilder.newInstance().add(base).add(entityType)
.add(writeReverseOrderedLong(starttime)).getBytesForLookup();
}
if (limit == null) {
// if limit is not specified, use the default
limit = DEFAULT_LIMIT;
}
ATSEntities atsEntities = new ATSEntities();
iterator = db.iterator();
iterator.seek(first);
// iterate until one of the following conditions is met: limit is
// reached, there are no more keys, the key prefix no longer matches,
// or a start time has been specified and reached/exceeded
while (atsEntities.getEntities().size() < limit && iterator.hasNext()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
WritableComparator.compareBytes(key, 0, key.length, last, 0,
last.length) > 0))
break;
// read the start time and entity from the current key
KeyParser kp = new KeyParser(key, prefix.length);
Long startTime = kp.getNextLong();
String entity = kp.getNextString();
// parse the entity that owns this key, iterating over all keys for
// the entity
ATSEntity atsEntity = getEntity(entity, entityType, startTime,
fields, iterator, key, kp.getOffset());
if (atsEntity == null)
continue;
// determine if the retrieved entity matches the provided secondary
// filters, and if so add it to the list of entities to return
boolean filterPassed = true;
if (secondaryFilters != null) {
for (NameValuePair filter : secondaryFilters) {
Object v = atsEntity.getOtherInfo().get(filter.getName());
if (v == null)
v = atsEntity.getPrimaryFilters().get(filter.getName());
if (v == null || !v.equals(filter.getValue())) {
filterPassed = false;
break;
}
}
}
if (filterPassed)
atsEntities.addEntity(atsEntity);
}
return atsEntities;
} finally {
IOUtils.cleanup(LOG, iterator);
}
}
/**
* Put a single entity. If there is an error, add a PutError to the given
* response.
*/
private void put(ATSEntity atsEntity, ATSPutErrors response) {
WriteBatch writeBatch = null;
try {
writeBatch = db.createWriteBatch();
List<ATSEvent> events = atsEntity.getEvents();
// look up the start time for the entity
byte[] revStartTime = getStartTime(atsEntity.getEntityId(),
atsEntity.getEntityType(), atsEntity.getStartTime(), events,
writeBatch);
if (revStartTime == null) {
// if no start time is found, add an error and return
ATSPutError error = new ATSPutError();
error.setEntityId(atsEntity.getEntityId());
error.setEntityType(atsEntity.getEntityType());
error.setErrorCode(ATSPutError.NO_START_TIME);
response.addError(error);
return;
}
Long revStartTimeLong = readReverseOrderedLong(revStartTime, 0);
Map<String, Object> primaryFilters = atsEntity.getPrimaryFilters();
// write event entries
if (events != null && !events.isEmpty()) {
for (ATSEvent event : events) {
byte[] revts = writeReverseOrderedLong(event.getTimestamp());
byte[] key = createEntityEventKey(atsEntity.getEntityId(),
atsEntity.getEntityType(), revStartTime, revts,
event.getEventType());
byte[] value = GenericObjectMapper.write(event.getEventInfo());
writeBatch.put(key, value);
writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
}
}
// write related entity entries
Map<String,List<String>> relatedEntities =
atsEntity.getRelatedEntities();
if (relatedEntities != null && !relatedEntities.isEmpty()) {
for (Entry<String, List<String>> relatedEntityList :
relatedEntities.entrySet()) {
String relatedEntityType = relatedEntityList.getKey();
for (String relatedEntityId : relatedEntityList.getValue()) {
// look up start time of related entity
byte[] relatedEntityStartTime = getStartTime(relatedEntityId,
relatedEntityType, null, null, writeBatch);
if (relatedEntityStartTime == null) {
// if start time is not found, set start time of the related
// entity to the start time of this entity, and write it to the
// db and the cache
relatedEntityStartTime = revStartTime;
writeBatch.put(createStartTimeLookupKey(relatedEntityId,
relatedEntityType), relatedEntityStartTime);
startTimeCache.put(new EntityIdentifier(relatedEntityId,
relatedEntityType), revStartTimeLong);
}
// write reverse entry (related entity -> entity)
byte[] key = createReleatedEntityKey(relatedEntityId,
relatedEntityType, relatedEntityStartTime,
atsEntity.getEntityId(), atsEntity.getEntityType());
writeBatch.put(key, EMPTY_BYTES);
// TODO: write forward entry (entity -> related entity)?
}
}
}
// write primary filter entries
if (primaryFilters != null && !primaryFilters.isEmpty()) {
for (Entry<String, Object> primaryFilter : primaryFilters.entrySet()) {
byte[] key = createPrimaryFilterKey(atsEntity.getEntityId(),
atsEntity.getEntityType(), revStartTime, primaryFilter.getKey());
byte[] value = GenericObjectMapper.write(primaryFilter.getValue());
writeBatch.put(key, value);
writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
}
}
// write other info entries
Map<String, Object> otherInfo = atsEntity.getOtherInfo();
if (otherInfo != null && !otherInfo.isEmpty()) {
for (Entry<String, Object> i : otherInfo.entrySet()) {
byte[] key = createOtherInfoKey(atsEntity.getEntityId(),
atsEntity.getEntityType(), revStartTime, i.getKey());
byte[] value = GenericObjectMapper.write(i.getValue());
writeBatch.put(key, value);
writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
}
}
db.write(writeBatch);
} catch (IOException e) {
LOG.error("Error putting entity " + atsEntity.getEntityId() +
" of type " + atsEntity.getEntityType(), e);
ATSPutError error = new ATSPutError();
error.setEntityId(atsEntity.getEntityId());
error.setEntityType(atsEntity.getEntityType());
error.setErrorCode(ATSPutError.IO_EXCEPTION);
response.addError(error);
} finally {
IOUtils.cleanup(LOG, writeBatch);
}
}
/**
* For a given key / value pair that has been written to the db,
* write additional entries to the db for each primary filter.
*/
private static void writePrimaryFilterEntries(WriteBatch writeBatch,
Map<String, Object> primaryFilters, byte[] key, byte[] value)
throws IOException {
if (primaryFilters != null && !primaryFilters.isEmpty()) {
for (Entry<String, Object> p : primaryFilters.entrySet()) {
writeBatch.put(addPrimaryFilterToKey(p.getKey(), p.getValue(),
key), value);
}
}
}
@Override
public ATSPutErrors put(ATSEntities atsEntities) {
ATSPutErrors response = new ATSPutErrors();
for (ATSEntity atsEntity : atsEntities.getEntities()) {
put(atsEntity, response);
}
return response;
}
/**
* Get the unique start time for a given entity as a byte array that sorts
* the timestamps in reverse order (see {@link
* GenericObjectMapper#writeReverseOrderedLong(long)}).
*
* @param entityId The id of the entity
* @param entityType The type of the entity
* @param startTime The start time of the entity, or null
* @param events A list of events for the entity, or null
* @param writeBatch A leveldb write batch, if the method is called by a
* put as opposed to a get
* @return A byte array
* @throws IOException
*/
private byte[] getStartTime(String entityId, String entityType,
Long startTime, List<ATSEvent> events, WriteBatch writeBatch)
throws IOException {
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
if (startTime == null) {
// start time is not provided, so try to look it up
if (startTimeCache.containsKey(entity)) {
// found the start time in the cache
startTime = startTimeCache.get(entity);
} else {
// try to look up the start time in the db
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
byte[] v = db.get(b);
if (v == null) {
// did not find the start time in the db
// if this is a put, try to set it from the provided events
if (events == null || writeBatch == null) {
// no events, or not a put, so return null
return null;
}
Long min = Long.MAX_VALUE;
for (ATSEvent e : events)
if (min > e.getTimestamp())
min = e.getTimestamp();
startTime = min;
// selected start time as minimum timestamp of provided events
// write start time to db and cache
writeBatch.put(b, writeReverseOrderedLong(startTime));
startTimeCache.put(entity, startTime);
} else {
// found the start time in the db
startTime = readReverseOrderedLong(v, 0);
if (writeBatch != null) {
// if this is a put, re-add the start time to the cache
startTimeCache.put(entity, startTime);
}
}
}
} else {
// start time is provided
// TODO: verify start time in db as well as cache?
if (startTimeCache.containsKey(entity)) {
// if the start time is already in the cache,
// and it is different from the provided start time,
// use the one from the cache
if (!startTime.equals(startTimeCache.get(entity)))
startTime = startTimeCache.get(entity);
} else if (writeBatch != null) {
// if this is a put, write the provided start time to the db and the
// cache
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
writeBatch.put(b, writeReverseOrderedLong(startTime));
startTimeCache.put(entity, startTime);
}
}
return writeReverseOrderedLong(startTime);
}
/**
* Creates a key for looking up the start time of a given entity,
* of the form START_TIME_LOOKUP_PREFIX + entitytype + entity.
*/
private static byte[] createStartTimeLookupKey(String entity,
String entitytype) throws IOException {
return KeyBuilder.newInstance().add(START_TIME_LOOKUP_PREFIX)
.add(entitytype).add(entity).getBytes();
}
/**
* Creates an index entry for the given key of the form
* INDEXED_ENTRY_PREFIX + primaryfiltername + primaryfiltervalue + key.
*/
private static byte[] addPrimaryFilterToKey(String primaryFilterName,
Object primaryFilterValue, byte[] key) throws IOException {
return KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
.add(primaryFilterName)
.add(GenericObjectMapper.write(primaryFilterValue), true).add(key)
.getBytes();
}
/**
* Creates an event key, serializing ENTITY_ENTRY_PREFIX + entitytype +
* revstarttime + entity + TIME_COLUMN + reveventtimestamp + eventtype.
*/
private static byte[] createEntityEventKey(String entity, String entitytype,
byte[] revStartTime, byte[] reveventtimestamp, String eventtype)
throws IOException {
return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
.add(entitytype).add(revStartTime).add(entity).add(TIME_COLUMN)
.add(reveventtimestamp).add(eventtype).getBytes();
}
/**
* Creates an event object from the given key, offset, and value. If the
* event type is not contained in the specified set of event types,
* returns null.
*/
private static ATSEvent getEntityEvent(Set<String> eventTypes, byte[] key,
int offset, byte[] value) throws IOException {
KeyParser kp = new KeyParser(key, offset);
long ts = kp.getNextLong();
String tstype = kp.getNextString();
if (eventTypes == null || eventTypes.contains(tstype)) {
ATSEvent event = new ATSEvent();
event.setTimestamp(ts);
event.setEventType(tstype);
Object o = GenericObjectMapper.read(value);
if (o == null) {
event.setEventInfo(null);
} else if (o instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> m = (Map<String, Object>) o;
event.setEventInfo(m);
} else {
throw new IOException("Couldn't deserialize event info map");
}
return event;
}
return null;
}
/**
* Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX +
* entitytype + revstarttime + entity + PRIMARY_FILTER_COLUMN + name.
*/
private static byte[] createPrimaryFilterKey(String entity,
String entitytype, byte[] revStartTime, String name) throws IOException {
return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype)
.add(revStartTime).add(entity).add(PRIMARY_FILTER_COLUMN).add(name)
.getBytes();
}
/**
* Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entitytype +
* revstarttime + entity + OTHER_INFO_COLUMN + name.
*/
private static byte[] createOtherInfoKey(String entity, String entitytype,
byte[] revStartTime, String name) throws IOException {
return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype)
.add(revStartTime).add(entity).add(OTHER_INFO_COLUMN).add(name)
.getBytes();
}
/**
* Creates a string representation of the byte array from the given offset
* to the end of the array (for parsing other info keys).
*/
private static String parseRemainingKey(byte[] b, int offset) {
return new String(b, offset, b.length - offset);
}
/**
* Creates a related entity key, serializing ENTITY_ENTRY_PREFIX +
* entitytype + revstarttime + entity + RELATED_COLUMN + relatedentitytype +
* relatedentity.
*/
private static byte[] createReleatedEntityKey(String entity,
String entitytype, byte[] revStartTime, String relatedEntity,
String relatedEntityType) throws IOException {
return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype)
.add(revStartTime).add(entity).add(RELATED_COLUMN)
.add(relatedEntityType).add(relatedEntity).getBytes();
}
/**
* Parses the related entity from the given key at the given offset and
* adds it to the given entity.
*/
private static void addRelatedEntity(ATSEntity atsEntity, byte[] key,
int offset) throws IOException {
KeyParser kp = new KeyParser(key, offset);
String type = kp.getNextString();
String id = kp.getNextString();
atsEntity.addRelatedEntity(type, id);
}
/**
* Clears the cache to test reloading start times from leveldb (only for
* testing).
*/
@VisibleForTesting
void clearStartTimeCache() {
startTimeCache.clear();
}
}

View File

@ -53,8 +53,8 @@ import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError;
public class MemoryApplicationTimelineStore public class MemoryApplicationTimelineStore
extends AbstractService implements ApplicationTimelineStore { extends AbstractService implements ApplicationTimelineStore {
private Map<EntityId, ATSEntity> entities = private Map<EntityIdentifier, ATSEntity> entities =
new HashMap<EntityId, ATSEntity>(); new HashMap<EntityIdentifier, ATSEntity>();
public MemoryApplicationTimelineStore() { public MemoryApplicationTimelineStore() {
super(MemoryApplicationTimelineStore.class.getName()); super(MemoryApplicationTimelineStore.class.getName());
@ -125,7 +125,7 @@ public class MemoryApplicationTimelineStore
if (fieldsToRetrieve == null) { if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.allOf(Field.class); fieldsToRetrieve = EnumSet.allOf(Field.class);
} }
ATSEntity entity = entities.get(new EntityId(entityId, entityType)); ATSEntity entity = entities.get(new EntityIdentifier(entityId, entityType));
if (entity == null) { if (entity == null) {
return null; return null;
} else { } else {
@ -152,7 +152,7 @@ public class MemoryApplicationTimelineStore
windowEnd = Long.MAX_VALUE; windowEnd = Long.MAX_VALUE;
} }
for (String entityId : entityIds) { for (String entityId : entityIds) {
EntityId entityID = new EntityId(entityId, entityType); EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
ATSEntity entity = entities.get(entityID); ATSEntity entity = entities.get(entityID);
if (entity == null) { if (entity == null) {
continue; continue;
@ -184,8 +184,8 @@ public class MemoryApplicationTimelineStore
public ATSPutErrors put(ATSEntities data) { public ATSPutErrors put(ATSEntities data) {
ATSPutErrors errors = new ATSPutErrors(); ATSPutErrors errors = new ATSPutErrors();
for (ATSEntity entity : data.getEntities()) { for (ATSEntity entity : data.getEntities()) {
EntityId entityId = EntityIdentifier entityId =
new EntityId(entity.getEntityId(), entity.getEntityType()); new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
// store entity info in memory // store entity info in memory
ATSEntity existingEntity = entities.get(entityId); ATSEntity existingEntity = entities.get(entityId);
if (existingEntity == null) { if (existingEntity == null) {
@ -210,7 +210,7 @@ public class MemoryApplicationTimelineStore
ATSPutError error = new ATSPutError(); ATSPutError error = new ATSPutError();
error.setEntityId(entityId.getId()); error.setEntityId(entityId.getId());
error.setEntityType(entityId.getType()); error.setEntityType(entityId.getType());
error.setErrorCode(1); error.setErrorCode(ATSPutError.NO_START_TIME);
errors.addError(error); errors.addError(error);
entities.remove(entityId); entities.remove(entityId);
continue; continue;
@ -242,12 +242,20 @@ public class MemoryApplicationTimelineStore
continue; continue;
} }
for (String idStr : partRelatedEntities.getValue()) { for (String idStr : partRelatedEntities.getValue()) {
EntityId relatedEntityId = EntityIdentifier relatedEntityId =
new EntityId(idStr, partRelatedEntities.getKey()); new EntityIdentifier(idStr, partRelatedEntities.getKey());
ATSEntity relatedEntity = entities.get(relatedEntityId); ATSEntity relatedEntity = entities.get(relatedEntityId);
if (relatedEntity != null) { if (relatedEntity != null) {
relatedEntity.addRelatedEntity( relatedEntity.addRelatedEntity(
existingEntity.getEntityType(), existingEntity.getEntityId()); existingEntity.getEntityType(), existingEntity.getEntityId());
} else {
relatedEntity = new ATSEntity();
relatedEntity.setEntityId(relatedEntityId.getId());
relatedEntity.setEntityType(relatedEntityId.getType());
relatedEntity.setStartTime(existingEntity.getStartTime());
relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
existingEntity.getEntityId());
entities.put(relatedEntityId, relatedEntity);
} }
} }
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
@ -45,6 +46,8 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities; import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
@ -64,6 +67,8 @@ import com.google.inject.Singleton;
//TODO: support XML serialization/deserialization //TODO: support XML serialization/deserialization
public class ATSWebServices { public class ATSWebServices {
private static final Log LOG = LogFactory.getLog(ATSWebServices.class);
private ApplicationTimelineStore store; private ApplicationTimelineStore store;
@Inject @Inject
@ -143,6 +148,10 @@ public class ATSWebServices {
"windowStart, windowEnd or limit is not a numeric value."); "windowStart, windowEnd or limit is not a numeric value.");
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
throw new BadRequestException("requested invalid field."); throw new BadRequestException("requested invalid field.");
} catch (IOException e) {
LOG.error("Error getting entities", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
} }
if (entities == null) { if (entities == null) {
return new ATSEntities(); return new ATSEntities();
@ -171,6 +180,10 @@ public class ATSWebServices {
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
throw new BadRequestException( throw new BadRequestException(
"requested invalid field."); "requested invalid field.");
} catch (IOException e) {
LOG.error("Error getting entity", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
} }
if (entity == null) { if (entity == null) {
throw new WebApplicationException(Response.Status.NOT_FOUND); throw new WebApplicationException(Response.Status.NOT_FOUND);
@ -206,6 +219,10 @@ public class ATSWebServices {
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
throw new BadRequestException( throw new BadRequestException(
"windowStart, windowEnd or limit is not a numeric value."); "windowStart, windowEnd or limit is not a numeric value.");
} catch (IOException e) {
LOG.error("Error getting entity timelines", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
} }
if (events == null) { if (events == null) {
return new ATSEvents(); return new ATSEvents();
@ -228,7 +245,13 @@ public class ATSWebServices {
if (entities == null) { if (entities == null) {
return new ATSPutErrors(); return new ATSPutErrors();
} }
try {
return store.put(entities); return store.put(entities);
} catch (IOException e) {
LOG.error("Error putting entities", e);
throw new WebApplicationException(e,
Response.Status.INTERNAL_SERVER_ERROR);
}
} }
private void init(HttpServletResponse response) { private void init(HttpServletResponse response) {
@ -275,7 +298,17 @@ public class ATSWebServices {
String[] strs = str.split(delimiter); String[] strs = str.split(delimiter);
List<Field> fieldList = new ArrayList<Field>(); List<Field> fieldList = new ArrayList<Field>();
for (String s : strs) { for (String s : strs) {
fieldList.add(Field.valueOf(s.toUpperCase())); s = s.trim().toUpperCase();
if (s.equals("EVENTS"))
fieldList.add(Field.EVENTS);
else if (s.equals("LASTEVENTONLY"))
fieldList.add(Field.LAST_EVENT_ONLY);
else if (s.equals("RELATEDENTITIES"))
fieldList.add(Field.RELATED_ENTITIES);
else if (s.equals("PRIMARYFILTERS"))
fieldList.add(Field.PRIMARY_FILTERS);
else if (s.equals("OTHERINFO"))
fieldList.add(Field.OTHER_INFO);
} }
if (fieldList.size() == 0) if (fieldList.size() == 0)
return null; return null;

View File

@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -71,7 +73,7 @@ public class ApplicationTimelineStoreTestUtils {
/** /**
* Load test data into the given store * Load test data into the given store
*/ */
protected void loadTestData() { protected void loadTestData() throws IOException {
ATSEntities atsEntities = new ATSEntities(); ATSEntities atsEntities = new ATSEntities();
Map<String, Object> primaryFilters = new HashMap<String, Object>(); Map<String, Object> primaryFilters = new HashMap<String, Object>();
primaryFilters.put("user", "username"); primaryFilters.put("user", "username");
@ -126,7 +128,7 @@ public class ApplicationTimelineStoreTestUtils {
response = store.put(atsEntities); response = store.put(atsEntities);
assertEquals(0, response.getErrors().size()); assertEquals(0, response.getErrors().size());
atsEntities.setEntities(Collections.singletonList(createEntity(entity1b, atsEntities.setEntities(Collections.singletonList(createEntity(entity1b,
entityType1, 123l, Collections.singletonList(ev2), null, entityType1, 789l, Collections.singletonList(ev2), null,
primaryFilters, otherInfo2))); primaryFilters, otherInfo2)));
response = store.put(atsEntities); response = store.put(atsEntities);
assertEquals(0, response.getErrors().size()); assertEquals(0, response.getErrors().size());
@ -138,11 +140,11 @@ public class ApplicationTimelineStoreTestUtils {
ATSPutError error = response.getErrors().get(0); ATSPutError error = response.getErrors().get(0);
assertEquals("badentityid", error.getEntityId()); assertEquals("badentityid", error.getEntityId());
assertEquals("badentity", error.getEntityType()); assertEquals("badentity", error.getEntityType());
assertEquals((Integer) 1, error.getErrorCode()); assertEquals(ATSPutError.NO_START_TIME, error.getErrorCode());
} }
/** /**
* Load veification data * Load verification data
*/ */
protected void loadVerificationData() throws Exception { protected void loadVerificationData() throws Exception {
userFilter = new NameValuePair("user", userFilter = new NameValuePair("user",
@ -197,7 +199,7 @@ public class ApplicationTimelineStoreTestUtils {
events2.add(ev4); events2.add(ev4);
} }
public void testGetSingleEntity() { public void testGetSingleEntity() throws IOException {
// test getting entity info // test getting entity info
verifyEntityInfo(null, null, null, null, null, null, verifyEntityInfo(null, null, null, null, null, null,
store.getEntity("id_1", "type_2", EnumSet.allOf(Field.class))); store.getEntity("id_1", "type_2", EnumSet.allOf(Field.class)));
@ -222,6 +224,10 @@ public class ApplicationTimelineStoreTestUtils {
null, null, null, store.getEntity(entity1, entityType1, null, null, null, store.getEntity(entity1, entityType1,
EnumSet.of(Field.LAST_EVENT_ONLY))); EnumSet.of(Field.LAST_EVENT_ONLY)));
verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, store.getEntity(entity1b, entityType1,
null));
verifyEntityInfo(entity1, entityType1, null, null, primaryFilters, null, verifyEntityInfo(entity1, entityType1, null, null, primaryFilters, null,
store.getEntity(entity1, entityType1, store.getEntity(entity1, entityType1,
EnumSet.of(Field.PRIMARY_FILTERS))); EnumSet.of(Field.PRIMARY_FILTERS)));
@ -234,7 +240,7 @@ public class ApplicationTimelineStoreTestUtils {
EnumSet.of(Field.RELATED_ENTITIES))); EnumSet.of(Field.RELATED_ENTITIES)));
} }
public void testGetEntities() { public void testGetEntities() throws IOException {
// test getting entities // test getting entities
assertEquals("nonzero entities size for nonexistent type", 0, assertEquals("nonzero entities size for nonexistent type", 0,
store.getEntities("type_0", null, null, null, null, null, store.getEntities("type_0", null, null, null, null, null,
@ -305,7 +311,7 @@ public class ApplicationTimelineStoreTestUtils {
primaryFilters, otherInfo, entities.get(1)); primaryFilters, otherInfo, entities.get(1));
} }
public void testGetEntitiesWithPrimaryFilters() { public void testGetEntitiesWithPrimaryFilters() throws IOException {
// test using primary filter // test using primary filter
assertEquals("nonzero entities size for primary filter", 0, assertEquals("nonzero entities size for primary filter", 0,
store.getEntities("type_1", null, null, null, store.getEntities("type_1", null, null, null,
@ -361,7 +367,7 @@ public class ApplicationTimelineStoreTestUtils {
primaryFilters, otherInfo, entities.get(1)); primaryFilters, otherInfo, entities.get(1));
} }
public void testGetEntitiesWithSecondaryFilters() { public void testGetEntitiesWithSecondaryFilters() throws IOException {
// test using secondary filter // test using secondary filter
List<ATSEntity> entities = store.getEntities("type_1", null, null, null, List<ATSEntity> entities = store.getEntities("type_1", null, null, null,
null, goodTestingFilters, EnumSet.allOf(Field.class)).getEntities(); null, goodTestingFilters, EnumSet.allOf(Field.class)).getEntities();
@ -388,7 +394,7 @@ public class ApplicationTimelineStoreTestUtils {
assertEquals(0, entities.size()); assertEquals(0, entities.size());
} }
public void testGetEvents() { public void testGetEvents() throws IOException {
// test getting entity timelines // test getting entity timelines
SortedSet<String> sortedSet = new TreeSet<String>(); SortedSet<String> sortedSet = new TreeSet<String>();
sortedSet.add(entity1); sortedSet.add(entity1);

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.WritableComparator;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TestGenericObjectMapper {
@Test
public void testEncoding() {
testEncoding(Long.MAX_VALUE);
testEncoding(Long.MIN_VALUE);
testEncoding(0l);
testEncoding(128l);
testEncoding(256l);
testEncoding(512l);
testEncoding(-256l);
}
private static void testEncoding(long l) {
byte[] b = GenericObjectMapper.writeReverseOrderedLong(l);
assertEquals("error decoding", l,
GenericObjectMapper.readReverseOrderedLong(b, 0));
byte[] buf = new byte[16];
System.arraycopy(b, 0, buf, 5, 8);
assertEquals("error decoding at offset", l,
GenericObjectMapper.readReverseOrderedLong(buf, 5));
if (l > Long.MIN_VALUE) {
byte[] a = GenericObjectMapper.writeReverseOrderedLong(l-1);
assertEquals("error preserving ordering", 1,
WritableComparator.compareBytes(a, 0, a.length, b, 0, b.length));
}
if (l < Long.MAX_VALUE) {
byte[] c = GenericObjectMapper.writeReverseOrderedLong(l+1);
assertEquals("error preserving ordering", 1,
WritableComparator.compareBytes(b, 0, b.length, c, 0, c.length));
}
}
private static void verify(Object o) throws IOException {
assertEquals(o, GenericObjectMapper.read(GenericObjectMapper.write(o)));
}
@Test
public void testValueTypes() throws IOException {
verify(42l);
verify(42);
verify(1.23);
verify("abc");
verify(true);
List<String> list = new ArrayList<String>();
list.add("123");
list.add("abc");
verify(list);
Map<String,String> map = new HashMap<String,String>();
map.put("k1","v1");
map.put("k2","v2");
verify(map);
}
}

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TestLeveldbApplicationTimelineStore
extends ApplicationTimelineStoreTestUtils {
private FileContext fsContext;
private File fsPath;
@Before
public void setup() throws Exception {
fsContext = FileContext.getLocalFSFileContext();
Configuration conf = new Configuration();
fsPath = new File("target", this.getClass().getSimpleName() +
"-tmpDir").getAbsoluteFile();
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
conf.set(YarnConfiguration.ATS_LEVELDB_PATH_PROPERTY,
fsPath.getAbsolutePath());
store = new LeveldbApplicationTimelineStore();
store.init(conf);
store.start();
loadTestData();
loadVerificationData();
}
@After
public void tearDown() throws Exception {
store.stop();
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
}
@Test
public void testGetSingleEntity() throws IOException {
super.testGetSingleEntity();
((LeveldbApplicationTimelineStore)store).clearStartTimeCache();
super.testGetSingleEntity();
}
@Test
public void testGetEntities() throws IOException {
super.testGetEntities();
}
@Test
public void testGetEntitiesWithPrimaryFilters() throws IOException {
super.testGetEntitiesWithPrimaryFilters();
}
@Test
public void testGetEntitiesWithSecondaryFilters() throws IOException {
super.testGetEntitiesWithSecondaryFilters();
}
@Test
public void testGetEvents() throws IOException {
super.testGetEvents();
}
}

View File

@ -23,6 +23,7 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
public class TestMemoryApplicationTimelineStore public class TestMemoryApplicationTimelineStore
extends ApplicationTimelineStoreTestUtils { extends ApplicationTimelineStoreTestUtils {
@ -46,27 +47,27 @@ public class TestMemoryApplicationTimelineStore
} }
@Test @Test
public void testGetSingleEntity() { public void testGetSingleEntity() throws IOException {
super.testGetSingleEntity(); super.testGetSingleEntity();
} }
@Test @Test
public void testGetEntities() { public void testGetEntities() throws IOException {
super.testGetEntities(); super.testGetEntities();
} }
@Test @Test
public void testGetEntitiesWithPrimaryFilters() { public void testGetEntitiesWithPrimaryFilters() throws IOException {
super.testGetEntitiesWithPrimaryFilters(); super.testGetEntitiesWithPrimaryFilters();
} }
@Test @Test
public void testGetEntitiesWithSecondaryFilters() { public void testGetEntitiesWithSecondaryFilters() throws IOException {
super.testGetEntitiesWithSecondaryFilters(); super.testGetEntitiesWithSecondaryFilters();
} }
@Test @Test
public void testGetEvents() { public void testGetEvents() throws IOException {
super.testGetEvents(); super.testGetEvents();
} }

View File

@ -156,6 +156,43 @@ public class TestATSWebServices extends JerseyTest {
Assert.assertEquals(4, entity.getOtherInfo().size()); Assert.assertEquals(4, entity.getOtherInfo().size());
} }
@Test
public void testGetEntityFields1() throws Exception {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("apptimeline")
.path("type_1").path("id_1").queryParam("fields", "events,otherinfo")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
ATSEntity entity = response.getEntity(ATSEntity.class);
Assert.assertNotNull(entity);
Assert.assertEquals("id_1", entity.getEntityId());
Assert.assertEquals("type_1", entity.getEntityType());
Assert.assertEquals(123l, entity.getStartTime().longValue());
Assert.assertEquals(2, entity.getEvents().size());
Assert.assertEquals(0, entity.getPrimaryFilters().size());
Assert.assertEquals(4, entity.getOtherInfo().size());
}
@Test
public void testGetEntityFields2() throws Exception {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("apptimeline")
.path("type_1").path("id_1").queryParam("fields", "lasteventonly," +
"primaryfilters,relatedentities")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
ATSEntity entity = response.getEntity(ATSEntity.class);
Assert.assertNotNull(entity);
Assert.assertEquals("id_1", entity.getEntityId());
Assert.assertEquals("type_1", entity.getEntityType());
Assert.assertEquals(123l, entity.getStartTime().longValue());
Assert.assertEquals(1, entity.getEvents().size());
Assert.assertEquals(2, entity.getPrimaryFilters().size());
Assert.assertEquals(0, entity.getOtherInfo().size());
}
@Test @Test
public void testGetEvents() throws Exception { public void testGetEvents() throws Exception {
WebResource r = resource(); WebResource r = resource();