From 23b2e43f5d678517e33590d15dec73225b9c5682 Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Sat, 8 Feb 2014 02:15:46 +0000 Subject: [PATCH] YARN-1635. Implemented a Leveldb based ApplicationTimelineStore. Contributed by Billie Rinaldi. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1565868 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-project/pom.xml | 6 + hadoop-yarn-project/CHANGES.txt | 3 + .../api/records/apptimeline/ATSPutErrors.java | 18 +- .../hadoop/yarn/conf/YarnConfiguration.java | 4 + .../src/main/resources/yarn-default.xml | 8 +- .../TestApplicationTimelineRecords.java | 4 +- .../pom.xml | 19 + .../ApplicationTimelineReader.java | 16 +- .../ApplicationTimelineWriter.java | 5 +- .../{EntityId.java => EntityIdentifier.java} | 8 +- .../apptimeline/GenericObjectMapper.java | 208 +++++ .../LeveldbApplicationTimelineStore.java | 854 ++++++++++++++++++ .../MemoryApplicationTimelineStore.java | 26 +- .../webapp/ATSWebServices.java | 37 +- .../ApplicationTimelineStoreTestUtils.java | 24 +- .../apptimeline/TestGenericObjectMapper.java | 89 ++ .../TestLeveldbApplicationTimelineStore.java | 95 ++ .../TestMemoryApplicationTimelineStore.java | 11 +- .../webapp/TestATSWebServices.java | 37 + 19 files changed, 1431 insertions(+), 41 deletions(-) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/{EntityId.java => EntityIdentifier.java} (91%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/GenericObjectMapper.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/LeveldbApplicationTimelineStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestGenericObjectMapper.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestLeveldbApplicationTimelineStore.java diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 3a6519c17c2..b5337f12619 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -784,6 +784,12 @@ grizzly-http-servlet 2.1.2 + + + org.fusesource.leveldbjni + leveldbjni-all + 1.8 + diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c3027f0bf84..3039c6f9a07 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -116,6 +116,9 @@ Release 2.4.0 - UNRELEASED YARN-1566. Changed Distributed Shell to retain containers across application attempts. (Jian He via vinodkv) + YARN-1635. Implemented a Leveldb based ApplicationTimelineStore. (Billie + Rinaldi via zjshen) + IMPROVEMENTS YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/apptimeline/ATSPutErrors.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/apptimeline/ATSPutErrors.java index 91458e1419f..d330eb41dff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/apptimeline/ATSPutErrors.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/apptimeline/ATSPutErrors.java @@ -94,9 +94,21 @@ public void setErrors(List errors) { @Public @Unstable public static class ATSPutError { + /** + * Error code returned when no start time can be found when putting an + * entity. This occurs when the entity does not already exist in the + * store and it is put with no start time or events specified. + */ + public static final int NO_START_TIME = 1; + /** + * Error code returned if an IOException is encountered when putting an + * entity. + */ + public static final int IO_EXCEPTION = 2; + private String entityId; private String entityType; - private Integer errorCode; + private int errorCode; /** * Get the entity Id @@ -144,7 +156,7 @@ public void setEntityType(String entityType) { * @return an error code */ @XmlElement(name = "errorcode") - public Integer getErrorCode() { + public int getErrorCode() { return errorCode; } @@ -154,7 +166,7 @@ public Integer getErrorCode() { * @param errorCode * an error code */ - public void setErrorCode(Integer errorCode) { + public void setErrorCode(int errorCode) { this.errorCode = errorCode; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5322ccd5de6..8c8ad16e8e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1041,6 +1041,10 @@ public class YarnConfiguration extends Configuration { /** ATS store class */ public static final String ATS_STORE = ATS_PREFIX + "store.class"; + /** ATS leveldb path */ + public static final String ATS_LEVELDB_PATH_PROPERTY = + ATS_PREFIX + "leveldb-apptimeline-store.path"; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index c50ea7b7087..cc8b12437ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1145,7 +1145,13 @@ Store class name for application timeline store yarn.ats.store.class - org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.MemoryApplicationTimelineStore + org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.LeveldbApplicationTimelineStore + + + + Store file name for leveldb application timeline store + yarn.ats.leveldb-apptimeline-store.path + ${yarn.log.dir}/ats diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/apptimeline/TestApplicationTimelineRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/apptimeline/TestApplicationTimelineRecords.java index f2a6d3ef461..24d1ce91e62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/apptimeline/TestApplicationTimelineRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/apptimeline/TestApplicationTimelineRecords.java @@ -117,14 +117,14 @@ public void testATSPutErrors() { ATSPutError error1 = new ATSPutError(); error1.setEntityId("entity id 1"); error1.setEntityId("entity type 1"); - error1.setErrorCode(1); + error1.setErrorCode(ATSPutError.NO_START_TIME); atsPutErrors.addError(error1); List errors = new ArrayList(); errors.add(error1); ATSPutError error2 = new ATSPutError(); error2.setEntityId("entity id 2"); error2.setEntityId("entity type 2"); - error2.setErrorCode(2); + error2.setErrorCode(ATSPutError.IO_EXCEPTION); errors.add(error2); atsPutErrors.addErrors(errors); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml index d314d026e99..59be859c7c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml @@ -167,6 +167,25 @@ jersey-test-framework-grizzly2 test + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + + commons-collections + commons-collections + + + + org.fusesource.leveldbjni + leveldbjni-all + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineReader.java index 97a217dc98a..e448ba8bcad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineReader.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline; +import java.io.IOException; import java.util.Collection; import java.util.EnumSet; import java.util.Set; @@ -78,13 +79,15 @@ enum Field { * retrieve (see {@link Field}). If the set of fields * contains {@link Field#LAST_EVENT_ONLY} and not * {@link Field#EVENTS}, the most recent event for - * each entity is retrieved. + * each entity is retrieved. If null, retrieves all + * fields. * @return An {@link ATSEntities} object. + * @throws IOException */ ATSEntities getEntities(String entityType, Long limit, Long windowStart, Long windowEnd, NameValuePair primaryFilter, Collection secondaryFilters, - EnumSet fieldsToRetrieve); + EnumSet fieldsToRetrieve) throws IOException; /** * This method retrieves the entity information for a given entity. @@ -95,11 +98,13 @@ ATSEntities getEntities(String entityType, * retrieve (see {@link Field}). If the set of * fields contains {@link Field#LAST_EVENT_ONLY} and * not {@link Field#EVENTS}, the most recent event - * for each entity is retrieved. + * for each entity is retrieved. If null, retrieves + * all fields. * @return An {@link ATSEntity} object. + * @throws IOException */ ATSEntity getEntity(String entity, String entityType, EnumSet - fieldsToRetrieve); + fieldsToRetrieve) throws IOException; /** * This method retrieves the events for a list of entities all of the same @@ -118,8 +123,9 @@ ATSEntity getEntity(String entity, String entityType, EnumSet * @param eventTypes Restricts the events returned to the given types. If * null, events of all types will be returned. * @return An {@link ATSEvents} object. + * @throws IOException */ ATSEvents getEntityTimelines(String entityType, SortedSet entityIds, Long limit, Long windowStart, - Long windowEnd, Set eventTypes); + Long windowEnd, Set eventTypes) throws IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineWriter.java index b7bd0708e43..2a16833d980 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineWriter.java @@ -23,6 +23,8 @@ import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities; import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors; +import java.io.IOException; + /** * This interface is for storing application timeline information. */ @@ -37,7 +39,8 @@ public interface ApplicationTimelineWriter { * * @param data An {@link ATSEntities} object. * @return An {@link ATSPutErrors} object. + * @throws IOException */ - ATSPutErrors put(ATSEntities data); + ATSPutErrors put(ATSEntities data) throws IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/EntityId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/EntityIdentifier.java similarity index 91% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/EntityId.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/EntityIdentifier.java index 26431f87569..d22e616fd1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/EntityId.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/EntityIdentifier.java @@ -26,12 +26,12 @@ */ @Private @Unstable -public class EntityId implements Comparable { +public class EntityIdentifier implements Comparable { private String id; private String type; - public EntityId(String id, String type) { + public EntityIdentifier(String id, String type) { this.id = id; this.type = type; } @@ -53,7 +53,7 @@ public String getType() { } @Override - public int compareTo(EntityId other) { + public int compareTo(EntityIdentifier other) { int c = type.compareTo(other.type); if (c != 0) return c; return id.compareTo(other.id); @@ -78,7 +78,7 @@ public boolean equals(Object obj) { return false; if (getClass() != obj.getClass()) return false; - EntityId other = (EntityId) obj; + EntityIdentifier other = (EntityIdentifier) obj; if (id == null) { if (other.id != null) return false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/GenericObjectMapper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/GenericObjectMapper.java new file mode 100644 index 00000000000..38ceb30c7d4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/GenericObjectMapper.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.WritableUtils; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * A utility class providing methods for serializing and deserializing + * objects. The {@link #write(Object)}, {@link #read(byte[])} and {@link + * #write(java.io.DataOutputStream, Object)}, {@link + * #read(java.io.DataInputStream)} methods are used by the + * {@link LeveldbApplicationTimelineStore} to store and retrieve arbitrary + * JSON, while the {@link #writeReverseOrderedLong} and {@link + * #readReverseOrderedLong} methods are used to sort entities in descending + * start time order. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class GenericObjectMapper { + private static final byte[] EMPTY_BYTES = new byte[0]; + + private static final byte LONG = 0x1; + private static final byte INTEGER = 0x2; + private static final byte DOUBLE = 0x3; + private static final byte STRING = 0x4; + private static final byte BOOLEAN = 0x5; + private static final byte LIST = 0x6; + private static final byte MAP = 0x7; + + /** + * Serializes an Object into a byte array. Along with {@link #read(byte[]) }, + * can be used to serialize an Object and deserialize it into an Object of + * the same type without needing to specify the Object's type, + * as long as it is one of the JSON-compatible objects Long, Integer, + * Double, String, Boolean, List, or Map. The current implementation uses + * ObjectMapper to serialize complex objects (List and Map) while using + * Writable to serialize simpler objects, to produce fewer bytes. + * + * @param o An Object + * @return A byte array representation of the Object + * @throws IOException + */ + public static byte[] write(Object o) throws IOException { + if (o == null) + return EMPTY_BYTES; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + write(new DataOutputStream(baos), o); + return baos.toByteArray(); + } + + /** + * Serializes an Object and writes it to a DataOutputStream. Along with + * {@link #read(java.io.DataInputStream)}, can be used to serialize an Object + * and deserialize it into an Object of the same type without needing to + * specify the Object's type, as long as it is one of the JSON-compatible + * objects Long, Integer, Double, String, Boolean, List, or Map. The current + * implementation uses ObjectMapper to serialize complex objects (List and + * Map) while using Writable to serialize simpler objects, to produce fewer + * bytes. + * + * @param dos A DataOutputStream + * @param o An Object + * @throws IOException + */ + public static void write(DataOutputStream dos, Object o) + throws IOException { + if (o == null) + return; + if (o instanceof Long) { + dos.write(LONG); + WritableUtils.writeVLong(dos, (Long) o); + } else if(o instanceof Integer) { + dos.write(INTEGER); + WritableUtils.writeVInt(dos, (Integer) o); + } else if(o instanceof Double) { + dos.write(DOUBLE); + dos.writeDouble((Double) o); + } else if (o instanceof String) { + dos.write(STRING); + WritableUtils.writeString(dos, (String) o); + } else if (o instanceof Boolean) { + dos.write(BOOLEAN); + dos.writeBoolean((Boolean) o); + } else if (o instanceof List) { + dos.write(LIST); + ObjectMapper mapper = new ObjectMapper(); + mapper.writeValue(dos, o); + } else if (o instanceof Map) { + dos.write(MAP); + ObjectMapper mapper = new ObjectMapper(); + mapper.writeValue(dos, o); + } else { + throw new IOException("Couldn't serialize object"); + } + } + + /** + * Deserializes an Object from a byte array created with + * {@link #write(Object)}. + * + * @param b A byte array + * @return An Object + * @throws IOException + */ + public static Object read(byte[] b) throws IOException { + if (b == null || b.length == 0) + return null; + ByteArrayInputStream bais = new ByteArrayInputStream(b); + return read(new DataInputStream(bais)); + } + + /** + * Reads an Object from a DataInputStream whose data has been written with + * {@link #write(java.io.DataOutputStream, Object)}. + * + * @param dis A DataInputStream + * @return An Object, null if an unrecognized type + * @throws IOException + */ + public static Object read(DataInputStream dis) throws IOException { + byte code = (byte)dis.read(); + ObjectMapper mapper; + switch (code) { + case LONG: + return WritableUtils.readVLong(dis); + case INTEGER: + return WritableUtils.readVInt(dis); + case DOUBLE: + return dis.readDouble(); + case STRING: + return WritableUtils.readString(dis); + case BOOLEAN: + return dis.readBoolean(); + case LIST: + mapper = new ObjectMapper(); + return mapper.readValue(dis, ArrayList.class); + case MAP: + mapper = new ObjectMapper(); + return mapper.readValue(dis, HashMap.class); + default: + return null; + } + } + + /** + * Converts a long to a 8-byte array so that lexicographic ordering of the + * produced byte arrays sort the longs in descending order. + * + * @param l A long + * @return A byte array + */ + public static byte[] writeReverseOrderedLong(long l) { + byte[] b = new byte[8]; + b[0] = (byte)(0x7f ^ ((l >> 56) & 0xff)); + for (int i = 1; i < 7; i++) + b[i] = (byte)(0xff ^ ((l >> 8*(7-i)) & 0xff)); + b[7] = (byte)(0xff ^ (l & 0xff)); + return b; + } + + /** + * Reads 8 bytes from an array starting at the specified offset and + * converts them to a long. The bytes are assumed to have been created + * with {@link #writeReverseOrderedLong}. + * + * @param b A byte array + * @param offset An offset into the byte array + * @return A long + */ + public static long readReverseOrderedLong(byte[] b, int offset) { + long l = b[offset] & 0xff; + for (int i = 1; i < 8; i++) { + l = l << 8; + l = l | (b[offset+i]&0xff); + } + return l ^ 0x7fffffffffffffffl; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/LeveldbApplicationTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/LeveldbApplicationTimelineStore.java new file mode 100644 index 00000000000..c2e93cab948 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/LeveldbApplicationTimelineStore.java @@ -0,0 +1,854 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.collections.map.LRUMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents.ATSEventsOfOneEntity; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.fusesource.leveldbjni.JniDBFactory; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBIterator; +import org.iq80.leveldb.Options; +import org.iq80.leveldb.WriteBatch; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice + .apptimeline.GenericObjectMapper.readReverseOrderedLong; +import static org.apache.hadoop.yarn.server.applicationhistoryservice + .apptimeline.GenericObjectMapper.writeReverseOrderedLong; + +/** + * An implementation of an application timeline store backed by leveldb. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class LeveldbApplicationTimelineStore extends AbstractService + implements ApplicationTimelineStore { + private static final Log LOG = LogFactory + .getLog(LeveldbApplicationTimelineStore.class); + + private static final String FILENAME = "leveldb-apptimeline-store.ldb"; + + private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes(); + private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes(); + private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes(); + + private static final byte[] PRIMARY_FILTER_COLUMN = "f".getBytes(); + private static final byte[] OTHER_INFO_COLUMN = "i".getBytes(); + private static final byte[] RELATED_COLUMN = "r".getBytes(); + private static final byte[] TIME_COLUMN = "t".getBytes(); + + private static final byte[] EMPTY_BYTES = new byte[0]; + + private static final int START_TIME_CACHE_SIZE = 10000; + + @SuppressWarnings("unchecked") + private final Map startTimeCache = + Collections.synchronizedMap(new LRUMap(START_TIME_CACHE_SIZE)); + + private DB db; + + public LeveldbApplicationTimelineStore() { + super(LeveldbApplicationTimelineStore.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + Options options = new Options(); + options.createIfMissing(true); + JniDBFactory factory = new JniDBFactory(); + String path = conf.get(YarnConfiguration.ATS_LEVELDB_PATH_PROPERTY); + File p = new File(path); + if (!p.exists()) + if (!p.mkdirs()) + throw new IOException("Couldn't create directory for leveldb " + + "application timeline store " + path); + LOG.info("Using leveldb path " + path); + db = factory.open(new File(path, FILENAME), options); + super.serviceInit(conf); + } + + @Override + protected void serviceStop() throws Exception { + IOUtils.cleanup(LOG, db); + super.serviceStop(); + } + + private static class KeyBuilder { + private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10; + private byte[][] b; + private boolean[] useSeparator; + private int index; + private int length; + + public KeyBuilder(int size) { + b = new byte[size][]; + useSeparator = new boolean[size]; + index = 0; + length = 0; + } + + public static KeyBuilder newInstance() { + return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS); + } + + public KeyBuilder add(String s) { + return add(s.getBytes(), true); + } + + public KeyBuilder add(byte[] t) { + return add(t, false); + } + + public KeyBuilder add(byte[] t, boolean sep) { + b[index] = t; + useSeparator[index] = sep; + length += t.length; + if (sep) + length++; + index++; + return this; + } + + public byte[] getBytes() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(length); + for (int i = 0; i < index; i++) { + baos.write(b[i]); + if (i < index-1 && useSeparator[i]) + baos.write(0x0); + } + return baos.toByteArray(); + } + + public byte[] getBytesForLookup() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(length); + for (int i = 0; i < index; i++) { + baos.write(b[i]); + if (useSeparator[i]) + baos.write(0x0); + } + return baos.toByteArray(); + } + } + + private static class KeyParser { + private final byte[] b; + private int offset; + + public KeyParser(byte[] b, int offset) { + this.b = b; + this.offset = offset; + } + + public String getNextString() throws IOException { + if (offset >= b.length) + throw new IOException( + "tried to read nonexistent string from byte array"); + int i = 0; + while (offset+i < b.length && b[offset+i] != 0x0) + i++; + String s = new String(b, offset, i); + offset = offset + i + 1; + return s; + } + + public long getNextLong() throws IOException { + if (offset+8 >= b.length) + throw new IOException("byte array ran out when trying to read long"); + long l = readReverseOrderedLong(b, offset); + offset += 8; + return l; + } + + public int getOffset() { + return offset; + } + } + + @Override + public ATSEntity getEntity(String entity, String entityType, + EnumSet fields) throws IOException { + DBIterator iterator = null; + try { + byte[] revStartTime = getStartTime(entity, entityType, null, null, null); + if (revStartTime == null) + return null; + byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) + .add(entityType).add(revStartTime).add(entity).getBytesForLookup(); + + iterator = db.iterator(); + iterator.seek(prefix); + + return getEntity(entity, entityType, + readReverseOrderedLong(revStartTime, 0), fields, iterator, prefix, + prefix.length); + } finally { + IOUtils.cleanup(LOG, iterator); + } + } + + /** + * Read entity from a db iterator. If no information is found in the + * specified fields for this entity, return null. + */ + private static ATSEntity getEntity(String entity, String entityType, + Long startTime, EnumSet fields, DBIterator iterator, + byte[] prefix, int prefixlen) throws IOException { + if (fields == null) + fields = EnumSet.allOf(Field.class); + + ATSEntity atsEntity = new ATSEntity(); + boolean events = false; + boolean lastEvent = false; + if (fields.contains(Field.EVENTS)) { + events = true; + atsEntity.setEvents(new ArrayList()); + } else if (fields.contains(Field.LAST_EVENT_ONLY)) { + lastEvent = true; + atsEntity.setEvents(new ArrayList()); + } + else { + atsEntity.setEvents(null); + } + boolean relatedEntities = false; + if (fields.contains(Field.RELATED_ENTITIES)) { + relatedEntities = true; + atsEntity.setRelatedEntities(new HashMap>()); + } else { + atsEntity.setRelatedEntities(null); + } + boolean primaryFilters = false; + if (fields.contains(Field.PRIMARY_FILTERS)) { + primaryFilters = true; + atsEntity.setPrimaryFilters(new HashMap()); + } else { + atsEntity.setPrimaryFilters(null); + } + boolean otherInfo = false; + if (fields.contains(Field.OTHER_INFO)) { + otherInfo = true; + atsEntity.setOtherInfo(new HashMap()); + } else { + atsEntity.setOtherInfo(null); + } + + // iterate through the entity's entry, parsing information if it is part + // of a requested field + for (; iterator.hasNext(); iterator.next()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefixlen, key)) + break; + if (key[prefixlen] == PRIMARY_FILTER_COLUMN[0]) { + if (primaryFilters) { + atsEntity.addPrimaryFilter(parseRemainingKey(key, + prefixlen + PRIMARY_FILTER_COLUMN.length), + GenericObjectMapper.read(iterator.peekNext().getValue())); + } + } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) { + if (otherInfo) { + atsEntity.addOtherInfo(parseRemainingKey(key, + prefixlen + OTHER_INFO_COLUMN.length), + GenericObjectMapper.read(iterator.peekNext().getValue())); + } + } else if (key[prefixlen] == RELATED_COLUMN[0]) { + if (relatedEntities) { + addRelatedEntity(atsEntity, key, + prefixlen + RELATED_COLUMN.length); + } + } else if (key[prefixlen] == TIME_COLUMN[0]) { + if (events || (lastEvent && atsEntity.getEvents().size() == 0)) { + ATSEvent event = getEntityEvent(null, key, prefixlen + + TIME_COLUMN.length, iterator.peekNext().getValue()); + if (event != null) { + atsEntity.addEvent(event); + } + } + } else { + LOG.warn(String.format("Found unexpected column for entity %s of " + + "type %s (0x%02x)", entity, entityType, key[prefixlen])); + } + } + + atsEntity.setEntityId(entity); + atsEntity.setEntityType(entityType); + atsEntity.setStartTime(startTime); + + return atsEntity; + } + + @Override + public ATSEvents getEntityTimelines(String entityType, + SortedSet entityIds, Long limit, Long windowStart, + Long windowEnd, Set eventType) throws IOException { + ATSEvents atsEvents = new ATSEvents(); + if (entityIds == null || entityIds.isEmpty()) + return atsEvents; + // create a lexicographically-ordered map from start time to entities + Map> startTimeMap = new TreeMap>(new Comparator() { + @Override + public int compare(byte[] o1, byte[] o2) { + return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, + o2.length); + } + }); + DBIterator iterator = null; + try { + // look up start times for the specified entities + // skip entities with no start time + for (String entity : entityIds) { + byte[] startTime = getStartTime(entity, entityType, null, null, null); + if (startTime != null) { + List entities = startTimeMap.get(startTime); + if (entities == null) { + entities = new ArrayList(); + startTimeMap.put(startTime, entities); + } + entities.add(new EntityIdentifier(entity, entityType)); + } + } + for (Entry> entry : + startTimeMap.entrySet()) { + // look up the events matching the given parameters (limit, + // start time, end time, event types) for entities whose start times + // were found and add the entities to the return list + byte[] revStartTime = entry.getKey(); + for (EntityIdentifier entity : entry.getValue()) { + ATSEventsOfOneEntity atsEntity = new ATSEventsOfOneEntity(); + atsEntity.setEntityId(entity.getId()); + atsEntity.setEntityType(entityType); + atsEvents.addEvent(atsEntity); + KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) + .add(entityType).add(revStartTime).add(entity.getId()) + .add(TIME_COLUMN); + byte[] prefix = kb.getBytesForLookup(); + if (windowEnd == null) { + windowEnd = Long.MAX_VALUE; + } + byte[] revts = writeReverseOrderedLong(windowEnd); + kb.add(revts); + byte[] first = kb.getBytesForLookup(); + byte[] last = null; + if (windowStart != null) { + last = KeyBuilder.newInstance().add(prefix) + .add(writeReverseOrderedLong(windowStart)).getBytesForLookup(); + } + if (limit == null) { + limit = DEFAULT_LIMIT; + } + iterator = db.iterator(); + for (iterator.seek(first); atsEntity.getEvents().size() < limit && + iterator.hasNext(); iterator.next()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefix.length, key) || (last != null && + WritableComparator.compareBytes(key, 0, key.length, last, 0, + last.length) > 0)) + break; + ATSEvent event = getEntityEvent(eventType, key, prefix.length, + iterator.peekNext().getValue()); + if (event != null) + atsEntity.addEvent(event); + } + } + } + } finally { + IOUtils.cleanup(LOG, iterator); + } + return atsEvents; + } + + /** + * Returns true if the byte array begins with the specified prefix. + */ + private static boolean prefixMatches(byte[] prefix, int prefixlen, + byte[] b) { + if (b.length < prefixlen) + return false; + return WritableComparator.compareBytes(prefix, 0, prefixlen, b, 0, + prefixlen) == 0; + } + + @Override + public ATSEntities getEntities(String entityType, + Long limit, Long windowStart, Long windowEnd, + NameValuePair primaryFilter, Collection secondaryFilters, + EnumSet fields) throws IOException { + if (primaryFilter == null) { + // if no primary filter is specified, prefix the lookup with + // ENTITY_ENTRY_PREFIX + return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit, + windowStart, windowEnd, secondaryFilters, fields); + } else { + // if a primary filter is specified, prefix the lookup with + // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue + + // ENTITY_ENTRY_PREFIX + byte[] base = KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX) + .add(primaryFilter.getName()) + .add(GenericObjectMapper.write(primaryFilter.getValue()), true) + .add(ENTITY_ENTRY_PREFIX).getBytesForLookup(); + return getEntityByTime(base, entityType, limit, windowStart, windowEnd, + secondaryFilters, fields); + } + } + + /** + * Retrieves a list of entities satisfying given parameters. + * + * @param base A byte array prefix for the lookup + * @param entityType The type of the entity + * @param limit A limit on the number of entities to return + * @param starttime The earliest entity start time to retrieve (exclusive) + * @param endtime The latest entity start time to retrieve (inclusive) + * @param secondaryFilters Filter pairs that the entities should match + * @param fields The set of fields to retrieve + * @return A list of entities + * @throws IOException + */ + private ATSEntities getEntityByTime(byte[] base, + String entityType, Long limit, Long starttime, Long endtime, + Collection secondaryFilters, EnumSet fields) + throws IOException { + DBIterator iterator = null; + try { + KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType); + // only db keys matching the prefix (base + entity type) will be parsed + byte[] prefix = kb.getBytesForLookup(); + if (endtime == null) { + // if end time is null, place no restriction on end time + endtime = Long.MAX_VALUE; + } + // using end time, construct a first key that will be seeked to + byte[] revts = writeReverseOrderedLong(endtime); + kb.add(revts); + byte[] first = kb.getBytesForLookup(); + byte[] last = null; + if (starttime != null) { + // if start time is not null, set a last key that will not be + // iterated past + last = KeyBuilder.newInstance().add(base).add(entityType) + .add(writeReverseOrderedLong(starttime)).getBytesForLookup(); + } + if (limit == null) { + // if limit is not specified, use the default + limit = DEFAULT_LIMIT; + } + + ATSEntities atsEntities = new ATSEntities(); + iterator = db.iterator(); + iterator.seek(first); + // iterate until one of the following conditions is met: limit is + // reached, there are no more keys, the key prefix no longer matches, + // or a start time has been specified and reached/exceeded + while (atsEntities.getEntities().size() < limit && iterator.hasNext()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefix.length, key) || (last != null && + WritableComparator.compareBytes(key, 0, key.length, last, 0, + last.length) > 0)) + break; + // read the start time and entity from the current key + KeyParser kp = new KeyParser(key, prefix.length); + Long startTime = kp.getNextLong(); + String entity = kp.getNextString(); + // parse the entity that owns this key, iterating over all keys for + // the entity + ATSEntity atsEntity = getEntity(entity, entityType, startTime, + fields, iterator, key, kp.getOffset()); + if (atsEntity == null) + continue; + // determine if the retrieved entity matches the provided secondary + // filters, and if so add it to the list of entities to return + boolean filterPassed = true; + if (secondaryFilters != null) { + for (NameValuePair filter : secondaryFilters) { + Object v = atsEntity.getOtherInfo().get(filter.getName()); + if (v == null) + v = atsEntity.getPrimaryFilters().get(filter.getName()); + if (v == null || !v.equals(filter.getValue())) { + filterPassed = false; + break; + } + } + } + if (filterPassed) + atsEntities.addEntity(atsEntity); + } + return atsEntities; + } finally { + IOUtils.cleanup(LOG, iterator); + } + } + + /** + * Put a single entity. If there is an error, add a PutError to the given + * response. + */ + private void put(ATSEntity atsEntity, ATSPutErrors response) { + WriteBatch writeBatch = null; + try { + writeBatch = db.createWriteBatch(); + List events = atsEntity.getEvents(); + // look up the start time for the entity + byte[] revStartTime = getStartTime(atsEntity.getEntityId(), + atsEntity.getEntityType(), atsEntity.getStartTime(), events, + writeBatch); + if (revStartTime == null) { + // if no start time is found, add an error and return + ATSPutError error = new ATSPutError(); + error.setEntityId(atsEntity.getEntityId()); + error.setEntityType(atsEntity.getEntityType()); + error.setErrorCode(ATSPutError.NO_START_TIME); + response.addError(error); + return; + } + Long revStartTimeLong = readReverseOrderedLong(revStartTime, 0); + Map primaryFilters = atsEntity.getPrimaryFilters(); + + // write event entries + if (events != null && !events.isEmpty()) { + for (ATSEvent event : events) { + byte[] revts = writeReverseOrderedLong(event.getTimestamp()); + byte[] key = createEntityEventKey(atsEntity.getEntityId(), + atsEntity.getEntityType(), revStartTime, revts, + event.getEventType()); + byte[] value = GenericObjectMapper.write(event.getEventInfo()); + writeBatch.put(key, value); + writePrimaryFilterEntries(writeBatch, primaryFilters, key, value); + } + } + + // write related entity entries + Map> relatedEntities = + atsEntity.getRelatedEntities(); + if (relatedEntities != null && !relatedEntities.isEmpty()) { + for (Entry> relatedEntityList : + relatedEntities.entrySet()) { + String relatedEntityType = relatedEntityList.getKey(); + for (String relatedEntityId : relatedEntityList.getValue()) { + // look up start time of related entity + byte[] relatedEntityStartTime = getStartTime(relatedEntityId, + relatedEntityType, null, null, writeBatch); + if (relatedEntityStartTime == null) { + // if start time is not found, set start time of the related + // entity to the start time of this entity, and write it to the + // db and the cache + relatedEntityStartTime = revStartTime; + writeBatch.put(createStartTimeLookupKey(relatedEntityId, + relatedEntityType), relatedEntityStartTime); + startTimeCache.put(new EntityIdentifier(relatedEntityId, + relatedEntityType), revStartTimeLong); + } + // write reverse entry (related entity -> entity) + byte[] key = createReleatedEntityKey(relatedEntityId, + relatedEntityType, relatedEntityStartTime, + atsEntity.getEntityId(), atsEntity.getEntityType()); + writeBatch.put(key, EMPTY_BYTES); + // TODO: write forward entry (entity -> related entity)? + } + } + } + + // write primary filter entries + if (primaryFilters != null && !primaryFilters.isEmpty()) { + for (Entry primaryFilter : primaryFilters.entrySet()) { + byte[] key = createPrimaryFilterKey(atsEntity.getEntityId(), + atsEntity.getEntityType(), revStartTime, primaryFilter.getKey()); + byte[] value = GenericObjectMapper.write(primaryFilter.getValue()); + writeBatch.put(key, value); + writePrimaryFilterEntries(writeBatch, primaryFilters, key, value); + } + } + + // write other info entries + Map otherInfo = atsEntity.getOtherInfo(); + if (otherInfo != null && !otherInfo.isEmpty()) { + for (Entry i : otherInfo.entrySet()) { + byte[] key = createOtherInfoKey(atsEntity.getEntityId(), + atsEntity.getEntityType(), revStartTime, i.getKey()); + byte[] value = GenericObjectMapper.write(i.getValue()); + writeBatch.put(key, value); + writePrimaryFilterEntries(writeBatch, primaryFilters, key, value); + } + } + db.write(writeBatch); + } catch (IOException e) { + LOG.error("Error putting entity " + atsEntity.getEntityId() + + " of type " + atsEntity.getEntityType(), e); + ATSPutError error = new ATSPutError(); + error.setEntityId(atsEntity.getEntityId()); + error.setEntityType(atsEntity.getEntityType()); + error.setErrorCode(ATSPutError.IO_EXCEPTION); + response.addError(error); + } finally { + IOUtils.cleanup(LOG, writeBatch); + } + } + + /** + * For a given key / value pair that has been written to the db, + * write additional entries to the db for each primary filter. + */ + private static void writePrimaryFilterEntries(WriteBatch writeBatch, + Map primaryFilters, byte[] key, byte[] value) + throws IOException { + if (primaryFilters != null && !primaryFilters.isEmpty()) { + for (Entry p : primaryFilters.entrySet()) { + writeBatch.put(addPrimaryFilterToKey(p.getKey(), p.getValue(), + key), value); + } + } + } + + @Override + public ATSPutErrors put(ATSEntities atsEntities) { + ATSPutErrors response = new ATSPutErrors(); + for (ATSEntity atsEntity : atsEntities.getEntities()) { + put(atsEntity, response); + } + return response; + } + + /** + * Get the unique start time for a given entity as a byte array that sorts + * the timestamps in reverse order (see {@link + * GenericObjectMapper#writeReverseOrderedLong(long)}). + * + * @param entityId The id of the entity + * @param entityType The type of the entity + * @param startTime The start time of the entity, or null + * @param events A list of events for the entity, or null + * @param writeBatch A leveldb write batch, if the method is called by a + * put as opposed to a get + * @return A byte array + * @throws IOException + */ + private byte[] getStartTime(String entityId, String entityType, + Long startTime, List events, WriteBatch writeBatch) + throws IOException { + EntityIdentifier entity = new EntityIdentifier(entityId, entityType); + if (startTime == null) { + // start time is not provided, so try to look it up + if (startTimeCache.containsKey(entity)) { + // found the start time in the cache + startTime = startTimeCache.get(entity); + } else { + // try to look up the start time in the db + byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); + byte[] v = db.get(b); + if (v == null) { + // did not find the start time in the db + // if this is a put, try to set it from the provided events + if (events == null || writeBatch == null) { + // no events, or not a put, so return null + return null; + } + Long min = Long.MAX_VALUE; + for (ATSEvent e : events) + if (min > e.getTimestamp()) + min = e.getTimestamp(); + startTime = min; + // selected start time as minimum timestamp of provided events + // write start time to db and cache + writeBatch.put(b, writeReverseOrderedLong(startTime)); + startTimeCache.put(entity, startTime); + } else { + // found the start time in the db + startTime = readReverseOrderedLong(v, 0); + if (writeBatch != null) { + // if this is a put, re-add the start time to the cache + startTimeCache.put(entity, startTime); + } + } + } + } else { + // start time is provided + // TODO: verify start time in db as well as cache? + if (startTimeCache.containsKey(entity)) { + // if the start time is already in the cache, + // and it is different from the provided start time, + // use the one from the cache + if (!startTime.equals(startTimeCache.get(entity))) + startTime = startTimeCache.get(entity); + } else if (writeBatch != null) { + // if this is a put, write the provided start time to the db and the + // cache + byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); + writeBatch.put(b, writeReverseOrderedLong(startTime)); + startTimeCache.put(entity, startTime); + } + } + return writeReverseOrderedLong(startTime); + } + + /** + * Creates a key for looking up the start time of a given entity, + * of the form START_TIME_LOOKUP_PREFIX + entitytype + entity. + */ + private static byte[] createStartTimeLookupKey(String entity, + String entitytype) throws IOException { + return KeyBuilder.newInstance().add(START_TIME_LOOKUP_PREFIX) + .add(entitytype).add(entity).getBytes(); + } + + /** + * Creates an index entry for the given key of the form + * INDEXED_ENTRY_PREFIX + primaryfiltername + primaryfiltervalue + key. + */ + private static byte[] addPrimaryFilterToKey(String primaryFilterName, + Object primaryFilterValue, byte[] key) throws IOException { + return KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX) + .add(primaryFilterName) + .add(GenericObjectMapper.write(primaryFilterValue), true).add(key) + .getBytes(); + } + + /** + * Creates an event key, serializing ENTITY_ENTRY_PREFIX + entitytype + + * revstarttime + entity + TIME_COLUMN + reveventtimestamp + eventtype. + */ + private static byte[] createEntityEventKey(String entity, String entitytype, + byte[] revStartTime, byte[] reveventtimestamp, String eventtype) + throws IOException { + return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) + .add(entitytype).add(revStartTime).add(entity).add(TIME_COLUMN) + .add(reveventtimestamp).add(eventtype).getBytes(); + } + + /** + * Creates an event object from the given key, offset, and value. If the + * event type is not contained in the specified set of event types, + * returns null. + */ + private static ATSEvent getEntityEvent(Set eventTypes, byte[] key, + int offset, byte[] value) throws IOException { + KeyParser kp = new KeyParser(key, offset); + long ts = kp.getNextLong(); + String tstype = kp.getNextString(); + if (eventTypes == null || eventTypes.contains(tstype)) { + ATSEvent event = new ATSEvent(); + event.setTimestamp(ts); + event.setEventType(tstype); + Object o = GenericObjectMapper.read(value); + if (o == null) { + event.setEventInfo(null); + } else if (o instanceof Map) { + @SuppressWarnings("unchecked") + Map m = (Map) o; + event.setEventInfo(m); + } else { + throw new IOException("Couldn't deserialize event info map"); + } + return event; + } + return null; + } + + /** + * Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX + + * entitytype + revstarttime + entity + PRIMARY_FILTER_COLUMN + name. + */ + private static byte[] createPrimaryFilterKey(String entity, + String entitytype, byte[] revStartTime, String name) throws IOException { + return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype) + .add(revStartTime).add(entity).add(PRIMARY_FILTER_COLUMN).add(name) + .getBytes(); + } + + /** + * Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entitytype + + * revstarttime + entity + OTHER_INFO_COLUMN + name. + */ + private static byte[] createOtherInfoKey(String entity, String entitytype, + byte[] revStartTime, String name) throws IOException { + return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype) + .add(revStartTime).add(entity).add(OTHER_INFO_COLUMN).add(name) + .getBytes(); + } + + /** + * Creates a string representation of the byte array from the given offset + * to the end of the array (for parsing other info keys). + */ + private static String parseRemainingKey(byte[] b, int offset) { + return new String(b, offset, b.length - offset); + } + + /** + * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX + + * entitytype + revstarttime + entity + RELATED_COLUMN + relatedentitytype + + * relatedentity. + */ + private static byte[] createReleatedEntityKey(String entity, + String entitytype, byte[] revStartTime, String relatedEntity, + String relatedEntityType) throws IOException { + return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype) + .add(revStartTime).add(entity).add(RELATED_COLUMN) + .add(relatedEntityType).add(relatedEntity).getBytes(); + } + + /** + * Parses the related entity from the given key at the given offset and + * adds it to the given entity. + */ + private static void addRelatedEntity(ATSEntity atsEntity, byte[] key, + int offset) throws IOException { + KeyParser kp = new KeyParser(key, offset); + String type = kp.getNextString(); + String id = kp.getNextString(); + atsEntity.addRelatedEntity(type, id); + } + + /** + * Clears the cache to test reloading start times from leveldb (only for + * testing). + */ + @VisibleForTesting + void clearStartTimeCache() { + startTimeCache.clear(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/MemoryApplicationTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/MemoryApplicationTimelineStore.java index 45f0a11d764..1c8e392cfe2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/MemoryApplicationTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/MemoryApplicationTimelineStore.java @@ -53,8 +53,8 @@ public class MemoryApplicationTimelineStore extends AbstractService implements ApplicationTimelineStore { - private Map entities = - new HashMap(); + private Map entities = + new HashMap(); public MemoryApplicationTimelineStore() { super(MemoryApplicationTimelineStore.class.getName()); @@ -125,7 +125,7 @@ public ATSEntity getEntity(String entityId, String entityType, if (fieldsToRetrieve == null) { fieldsToRetrieve = EnumSet.allOf(Field.class); } - ATSEntity entity = entities.get(new EntityId(entityId, entityType)); + ATSEntity entity = entities.get(new EntityIdentifier(entityId, entityType)); if (entity == null) { return null; } else { @@ -152,7 +152,7 @@ public ATSEvents getEntityTimelines(String entityType, windowEnd = Long.MAX_VALUE; } for (String entityId : entityIds) { - EntityId entityID = new EntityId(entityId, entityType); + EntityIdentifier entityID = new EntityIdentifier(entityId, entityType); ATSEntity entity = entities.get(entityID); if (entity == null) { continue; @@ -184,8 +184,8 @@ public ATSEvents getEntityTimelines(String entityType, public ATSPutErrors put(ATSEntities data) { ATSPutErrors errors = new ATSPutErrors(); for (ATSEntity entity : data.getEntities()) { - EntityId entityId = - new EntityId(entity.getEntityId(), entity.getEntityType()); + EntityIdentifier entityId = + new EntityIdentifier(entity.getEntityId(), entity.getEntityType()); // store entity info in memory ATSEntity existingEntity = entities.get(entityId); if (existingEntity == null) { @@ -210,7 +210,7 @@ public ATSPutErrors put(ATSEntities data) { ATSPutError error = new ATSPutError(); error.setEntityId(entityId.getId()); error.setEntityType(entityId.getType()); - error.setErrorCode(1); + error.setErrorCode(ATSPutError.NO_START_TIME); errors.addError(error); entities.remove(entityId); continue; @@ -242,12 +242,20 @@ public ATSPutErrors put(ATSEntities data) { continue; } for (String idStr : partRelatedEntities.getValue()) { - EntityId relatedEntityId = - new EntityId(idStr, partRelatedEntities.getKey()); + EntityIdentifier relatedEntityId = + new EntityIdentifier(idStr, partRelatedEntities.getKey()); ATSEntity relatedEntity = entities.get(relatedEntityId); if (relatedEntity != null) { relatedEntity.addRelatedEntity( existingEntity.getEntityType(), existingEntity.getEntityId()); + } else { + relatedEntity = new ATSEntity(); + relatedEntity.setEntityId(relatedEntityId.getId()); + relatedEntity.setEntityType(relatedEntityId.getType()); + relatedEntity.setStartTime(existingEntity.getStartTime()); + relatedEntity.addRelatedEntity(existingEntity.getEntityType(), + existingEntity.getEntityId()); + entities.put(relatedEntityId, relatedEntity); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/ATSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/ATSWebServices.java index 4ea501d89a8..063b67afd07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/ATSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/ATSWebServices.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; @@ -45,6 +46,8 @@ import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities; @@ -64,6 +67,8 @@ //TODO: support XML serialization/deserialization public class ATSWebServices { + private static final Log LOG = LogFactory.getLog(ATSWebServices.class); + private ApplicationTimelineStore store; @Inject @@ -143,6 +148,10 @@ public ATSEntities getEntities( "windowStart, windowEnd or limit is not a numeric value."); } catch (IllegalArgumentException e) { throw new BadRequestException("requested invalid field."); + } catch (IOException e) { + LOG.error("Error getting entities", e); + throw new WebApplicationException(e, + Response.Status.INTERNAL_SERVER_ERROR); } if (entities == null) { return new ATSEntities(); @@ -171,6 +180,10 @@ public ATSEntity getEntity( } catch (IllegalArgumentException e) { throw new BadRequestException( "requested invalid field."); + } catch (IOException e) { + LOG.error("Error getting entity", e); + throw new WebApplicationException(e, + Response.Status.INTERNAL_SERVER_ERROR); } if (entity == null) { throw new WebApplicationException(Response.Status.NOT_FOUND); @@ -206,6 +219,10 @@ public ATSEvents getEvents( } catch (NumberFormatException e) { throw new BadRequestException( "windowStart, windowEnd or limit is not a numeric value."); + } catch (IOException e) { + LOG.error("Error getting entity timelines", e); + throw new WebApplicationException(e, + Response.Status.INTERNAL_SERVER_ERROR); } if (events == null) { return new ATSEvents(); @@ -228,7 +245,13 @@ public ATSPutErrors postEntities( if (entities == null) { return new ATSPutErrors(); } - return store.put(entities); + try { + return store.put(entities); + } catch (IOException e) { + LOG.error("Error putting entities", e); + throw new WebApplicationException(e, + Response.Status.INTERNAL_SERVER_ERROR); + } } private void init(HttpServletResponse response) { @@ -275,7 +298,17 @@ private static EnumSet parseFieldsStr(String str, String delimiter) { String[] strs = str.split(delimiter); List fieldList = new ArrayList(); for (String s : strs) { - fieldList.add(Field.valueOf(s.toUpperCase())); + s = s.trim().toUpperCase(); + if (s.equals("EVENTS")) + fieldList.add(Field.EVENTS); + else if (s.equals("LASTEVENTONLY")) + fieldList.add(Field.LAST_EVENT_ONLY); + else if (s.equals("RELATEDENTITIES")) + fieldList.add(Field.RELATED_ENTITIES); + else if (s.equals("PRIMARYFILTERS")) + fieldList.add(Field.PRIMARY_FILTERS); + else if (s.equals("OTHERINFO")) + fieldList.add(Field.OTHER_INFO); } if (fieldList.size() == 0) return null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineStoreTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineStoreTestUtils.java index 5825af192b8..9afa5c0234a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineStoreTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineStoreTestUtils.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -71,7 +73,7 @@ public class ApplicationTimelineStoreTestUtils { /** * Load test data into the given store */ - protected void loadTestData() { + protected void loadTestData() throws IOException { ATSEntities atsEntities = new ATSEntities(); Map primaryFilters = new HashMap(); primaryFilters.put("user", "username"); @@ -126,7 +128,7 @@ protected void loadTestData() { response = store.put(atsEntities); assertEquals(0, response.getErrors().size()); atsEntities.setEntities(Collections.singletonList(createEntity(entity1b, - entityType1, 123l, Collections.singletonList(ev2), null, + entityType1, 789l, Collections.singletonList(ev2), null, primaryFilters, otherInfo2))); response = store.put(atsEntities); assertEquals(0, response.getErrors().size()); @@ -138,11 +140,11 @@ protected void loadTestData() { ATSPutError error = response.getErrors().get(0); assertEquals("badentityid", error.getEntityId()); assertEquals("badentity", error.getEntityType()); - assertEquals((Integer) 1, error.getErrorCode()); + assertEquals(ATSPutError.NO_START_TIME, error.getErrorCode()); } /** - * Load veification data + * Load verification data */ protected void loadVerificationData() throws Exception { userFilter = new NameValuePair("user", @@ -197,7 +199,7 @@ protected void loadVerificationData() throws Exception { events2.add(ev4); } - public void testGetSingleEntity() { + public void testGetSingleEntity() throws IOException { // test getting entity info verifyEntityInfo(null, null, null, null, null, null, store.getEntity("id_1", "type_2", EnumSet.allOf(Field.class))); @@ -222,6 +224,10 @@ public void testGetSingleEntity() { null, null, null, store.getEntity(entity1, entityType1, EnumSet.of(Field.LAST_EVENT_ONLY))); + verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, store.getEntity(entity1b, entityType1, + null)); + verifyEntityInfo(entity1, entityType1, null, null, primaryFilters, null, store.getEntity(entity1, entityType1, EnumSet.of(Field.PRIMARY_FILTERS))); @@ -234,7 +240,7 @@ public void testGetSingleEntity() { EnumSet.of(Field.RELATED_ENTITIES))); } - public void testGetEntities() { + public void testGetEntities() throws IOException { // test getting entities assertEquals("nonzero entities size for nonexistent type", 0, store.getEntities("type_0", null, null, null, null, null, @@ -305,7 +311,7 @@ public void testGetEntities() { primaryFilters, otherInfo, entities.get(1)); } - public void testGetEntitiesWithPrimaryFilters() { + public void testGetEntitiesWithPrimaryFilters() throws IOException { // test using primary filter assertEquals("nonzero entities size for primary filter", 0, store.getEntities("type_1", null, null, null, @@ -361,7 +367,7 @@ public void testGetEntitiesWithPrimaryFilters() { primaryFilters, otherInfo, entities.get(1)); } - public void testGetEntitiesWithSecondaryFilters() { + public void testGetEntitiesWithSecondaryFilters() throws IOException { // test using secondary filter List entities = store.getEntities("type_1", null, null, null, null, goodTestingFilters, EnumSet.allOf(Field.class)).getEntities(); @@ -388,7 +394,7 @@ public void testGetEntitiesWithSecondaryFilters() { assertEquals(0, entities.size()); } - public void testGetEvents() { + public void testGetEvents() throws IOException { // test getting entity timelines SortedSet sortedSet = new TreeSet(); sortedSet.add(entity1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestGenericObjectMapper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestGenericObjectMapper.java new file mode 100644 index 00000000000..4bb453a41be --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestGenericObjectMapper.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.WritableComparator; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TestGenericObjectMapper { + + @Test + public void testEncoding() { + testEncoding(Long.MAX_VALUE); + testEncoding(Long.MIN_VALUE); + testEncoding(0l); + testEncoding(128l); + testEncoding(256l); + testEncoding(512l); + testEncoding(-256l); + } + + private static void testEncoding(long l) { + byte[] b = GenericObjectMapper.writeReverseOrderedLong(l); + assertEquals("error decoding", l, + GenericObjectMapper.readReverseOrderedLong(b, 0)); + byte[] buf = new byte[16]; + System.arraycopy(b, 0, buf, 5, 8); + assertEquals("error decoding at offset", l, + GenericObjectMapper.readReverseOrderedLong(buf, 5)); + if (l > Long.MIN_VALUE) { + byte[] a = GenericObjectMapper.writeReverseOrderedLong(l-1); + assertEquals("error preserving ordering", 1, + WritableComparator.compareBytes(a, 0, a.length, b, 0, b.length)); + } + if (l < Long.MAX_VALUE) { + byte[] c = GenericObjectMapper.writeReverseOrderedLong(l+1); + assertEquals("error preserving ordering", 1, + WritableComparator.compareBytes(b, 0, b.length, c, 0, c.length)); + } + } + + private static void verify(Object o) throws IOException { + assertEquals(o, GenericObjectMapper.read(GenericObjectMapper.write(o))); + } + + @Test + public void testValueTypes() throws IOException { + verify(42l); + verify(42); + verify(1.23); + verify("abc"); + verify(true); + List list = new ArrayList(); + list.add("123"); + list.add("abc"); + verify(list); + Map map = new HashMap(); + map.put("k1","v1"); + map.put("k2","v2"); + verify(map); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestLeveldbApplicationTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestLeveldbApplicationTimelineStore.java new file mode 100644 index 00000000000..b868049c4fb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestLeveldbApplicationTimelineStore.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TestLeveldbApplicationTimelineStore + extends ApplicationTimelineStoreTestUtils { + private FileContext fsContext; + private File fsPath; + + @Before + public void setup() throws Exception { + fsContext = FileContext.getLocalFSFileContext(); + Configuration conf = new Configuration(); + fsPath = new File("target", this.getClass().getSimpleName() + + "-tmpDir").getAbsoluteFile(); + fsContext.delete(new Path(fsPath.getAbsolutePath()), true); + conf.set(YarnConfiguration.ATS_LEVELDB_PATH_PROPERTY, + fsPath.getAbsolutePath()); + store = new LeveldbApplicationTimelineStore(); + store.init(conf); + store.start(); + loadTestData(); + loadVerificationData(); + } + + @After + public void tearDown() throws Exception { + store.stop(); + fsContext.delete(new Path(fsPath.getAbsolutePath()), true); + } + + @Test + public void testGetSingleEntity() throws IOException { + super.testGetSingleEntity(); + ((LeveldbApplicationTimelineStore)store).clearStartTimeCache(); + super.testGetSingleEntity(); + } + + @Test + public void testGetEntities() throws IOException { + super.testGetEntities(); + } + + @Test + public void testGetEntitiesWithPrimaryFilters() throws IOException { + super.testGetEntitiesWithPrimaryFilters(); + } + + @Test + public void testGetEntitiesWithSecondaryFilters() throws IOException { + super.testGetEntitiesWithSecondaryFilters(); + } + + @Test + public void testGetEvents() throws IOException { + super.testGetEvents(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestMemoryApplicationTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestMemoryApplicationTimelineStore.java index aa88b74a901..07a3955bf67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestMemoryApplicationTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestMemoryApplicationTimelineStore.java @@ -23,6 +23,7 @@ import org.junit.Before; import org.junit.Test; +import java.io.IOException; public class TestMemoryApplicationTimelineStore extends ApplicationTimelineStoreTestUtils { @@ -46,27 +47,27 @@ public ApplicationTimelineStore getApplicationTimelineStore() { } @Test - public void testGetSingleEntity() { + public void testGetSingleEntity() throws IOException { super.testGetSingleEntity(); } @Test - public void testGetEntities() { + public void testGetEntities() throws IOException { super.testGetEntities(); } @Test - public void testGetEntitiesWithPrimaryFilters() { + public void testGetEntitiesWithPrimaryFilters() throws IOException { super.testGetEntitiesWithPrimaryFilters(); } @Test - public void testGetEntitiesWithSecondaryFilters() { + public void testGetEntitiesWithSecondaryFilters() throws IOException { super.testGetEntitiesWithSecondaryFilters(); } @Test - public void testGetEvents() { + public void testGetEvents() throws IOException { super.testGetEvents(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestATSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestATSWebServices.java index 1ff73ff35a2..58a826c9ac0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestATSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestATSWebServices.java @@ -156,6 +156,43 @@ public void testGetEntity() throws Exception { Assert.assertEquals(4, entity.getOtherInfo().size()); } + @Test + public void testGetEntityFields1() throws Exception { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1").path("apptimeline") + .path("type_1").path("id_1").queryParam("fields", "events,otherinfo") + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + ATSEntity entity = response.getEntity(ATSEntity.class); + Assert.assertNotNull(entity); + Assert.assertEquals("id_1", entity.getEntityId()); + Assert.assertEquals("type_1", entity.getEntityType()); + Assert.assertEquals(123l, entity.getStartTime().longValue()); + Assert.assertEquals(2, entity.getEvents().size()); + Assert.assertEquals(0, entity.getPrimaryFilters().size()); + Assert.assertEquals(4, entity.getOtherInfo().size()); + } + + @Test + public void testGetEntityFields2() throws Exception { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1").path("apptimeline") + .path("type_1").path("id_1").queryParam("fields", "lasteventonly," + + "primaryfilters,relatedentities") + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + ATSEntity entity = response.getEntity(ATSEntity.class); + Assert.assertNotNull(entity); + Assert.assertEquals("id_1", entity.getEntityId()); + Assert.assertEquals("type_1", entity.getEntityType()); + Assert.assertEquals(123l, entity.getStartTime().longValue()); + Assert.assertEquals(1, entity.getEvents().size()); + Assert.assertEquals(2, entity.getPrimaryFilters().size()); + Assert.assertEquals(0, entity.getOtherInfo().size()); + } + @Test public void testGetEvents() throws Exception { WebResource r = resource();