YARN-1717. Enabled periodically discarding old data in LeveldbTimelineStore. Contributed by Billie Rinaldi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1577693 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhijie Shen 2014-03-14 20:35:00 +00:00
parent 8a2a741595
commit b3ea4aebff
7 changed files with 759 additions and 131 deletions

View File

@ -155,6 +155,9 @@ Release 2.4.0 - UNRELEASED
transferred containers from previous app-attempts to new AMs after YARN-1490.
(Jian He via vinodkv)
YARN-1717. Enabled periodically discarding old data in LeveldbTimelineStore.
(Billie Rinaldi via zjshen)
IMPROVEMENTS
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via

View File

@ -1100,6 +1100,17 @@ public class YarnConfiguration extends Configuration {
public static final String TIMELINE_SERVICE_STORE =
TIMELINE_SERVICE_PREFIX + "store-class";
/** Timeline service enable data age off */
public static final String TIMELINE_SERVICE_TTL_ENABLE =
TIMELINE_SERVICE_PREFIX + "ttl-enable";
/** Timeline service length of time to retain data */
public static final String TIMELINE_SERVICE_TTL_MS =
TIMELINE_SERVICE_PREFIX + "ttl-ms";
public static final long DEFAULT_TIMELINE_SERVICE_TTL_MS =
1000 * 60 * 60 * 24 * 7;
public static final String TIMELINE_SERVICE_LEVELDB_PREFIX =
TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.";
@ -1107,16 +1118,36 @@ public class YarnConfiguration extends Configuration {
public static final String TIMELINE_SERVICE_LEVELDB_PATH =
TIMELINE_SERVICE_LEVELDB_PREFIX + "path";
/** Timeline service leveldb read cache (uncompressed blocks) */
public static final String TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE =
TIMELINE_SERVICE_LEVELDB_PREFIX + "read-cache-size";
public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE =
100 * 1024 * 1024;
/** Timeline service leveldb start time read cache (number of entities) */
public static final String
TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE =
TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-read-cache-size";
public static final int
DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE = 10000;
/** Timeline service leveldb start time write cache (number of entities) */
public static final String
TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE =
TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-write-cache-size";
public static final int
DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE = 10000;
/** Timeline service leveldb interval to wait between deletion rounds */
public static final String TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS =
TIMELINE_SERVICE_LEVELDB_PREFIX + "ttl-interval-ms";
public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS =
1000 * 60 * 5;
////////////////////////////////
// Other Configs
////////////////////////////////

View File

@ -1107,23 +1107,59 @@
</property>
<property>
<description>The https adddress of the timeline service web application.</description>
<description>The https address of the timeline service web application.</description>
<name>yarn.timeline-service.webapp.https.address</name>
<value>${yarn.timeline-service.hostname}:8190</value>
</property>
<property>
<description>Store class name for timeline store</description>
<description>Store class name for timeline store.</description>
<name>yarn.timeline-service.store-class</name>
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore</value>
</property>
<property>
<description>Store file name for leveldb timeline store</description>
<description>Enable age off of timeline store data.</description>
<name>yarn.timeline-service.ttl-enable</name>
<value>true</value>
</property>
<property>
<description>Time to live for timeline store data in milliseconds.</description>
<name>yarn.timeline-service.ttl-ms</name>
<value>604800000</value>
</property>
<property>
<description>Store file name for leveldb timeline store.</description>
<name>yarn.timeline-service.leveldb-timeline-store.path</name>
<value>${yarn.log.dir}/timeline</value>
</property>
<property>
<description>Length of time to wait between deletion cycles of leveldb timeline store in milliseconds.</description>
<name>yarn.timeline-service.leveldb-timeline-store.ttl-interval-ms</name>
<value>300000</value>
</property>
<property>
<description>Size of read cache for uncompressed blocks for leveldb timeline store in bytes.</description>
<name>yarn.timeline-service.leveldb-timeline-store.read-cache-size</name>
<value>104857600</value>
</property>
<property>
<description>Size of cache for recently read entity start times for leveldb timeline store in number of entities.</description>
<name>yarn.timeline-service.leveldb-timeline-store.start-time-read-cache-size</name>
<value>10000</value>
</property>
<property>
<description>Size of cache for recently written entity start times for leveldb timeline store in number of entities.</description>
<name>yarn.timeline-service.leveldb-timeline-store.start-time-write-cache-size</name>
<value>10000</value>
</property>
<property>
<description>Handler thread count to serve the client RPC requests.</description>
<name>yarn.timeline-service.handler-thread-count</name>

View File

@ -56,7 +56,7 @@ public class GenericObjectMapper {
*
* @param o An Object
* @return A byte array representation of the Object
* @throws IOException
* @throws IOException if there is a write error
*/
public static byte[] write(Object o) throws IOException {
if (o == null) {
@ -71,7 +71,7 @@ public static byte[] write(Object o) throws IOException {
*
* @param b A byte array
* @return An Object
* @throws IOException
* @throws IOException if there is a read error
*/
public static Object read(byte[] b) throws IOException {
return read(b, 0);
@ -84,7 +84,7 @@ public static Object read(byte[] b) throws IOException {
* @param b A byte array
* @param offset Offset into the array
* @return An Object
* @throws IOException
* @throws IOException if there is a read error
*/
public static Object read(byte[] b, int offset) throws IOException {
if (b == null || b.length == 0) {

View File

@ -259,7 +259,9 @@ public TimelinePutResponse postEntities(
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
}
}
LOG.info("Storing entities: " + CSV_JOINER.join(entityIDs));
if (LOG.isDebugEnabled()) {
LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
}
return store.put(entities);
} catch (IOException e) {
LOG.error("Error putting entities", e);

View File

@ -19,17 +19,29 @@
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
import org.iq80.leveldb.DBIterator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong;
import static org.junit.Assert.assertEquals;
@InterfaceAudience.Private
@ -48,6 +60,7 @@ public void setup() throws Exception {
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
fsPath.getAbsolutePath());
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
store = new LeveldbTimelineStore();
store.init(conf);
store.start();
@ -105,4 +118,119 @@ public void testCacheSizes() {
assertEquals(10002, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf));
}
private boolean deleteNextEntity(String entityType, byte[] ts)
throws IOException, InterruptedException {
DBIterator iterator = null;
DBIterator pfIterator = null;
try {
iterator = ((LeveldbTimelineStore)store).getDbIterator(false);
pfIterator = ((LeveldbTimelineStore)store).getDbIterator(false);
return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts,
iterator, pfIterator, false);
} finally {
IOUtils.cleanup(null, iterator, pfIterator);
}
}
@Test
public void testGetEntityTypes() throws IOException {
List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes();
assertEquals(2, entityTypes.size());
assertEquals(entityType1, entityTypes.get(0));
assertEquals(entityType2, entityTypes.get(1));
}
@Test
public void testDeleteEntities() throws IOException, InterruptedException {
assertEquals(2, store.getEntities("type_1", null, null, null, null, null,
null).getEntities().size());
assertEquals(1, store.getEntities("type_2", null, null, null, null, null,
null).getEntities().size());
assertEquals(false, deleteNextEntity(entityType1,
writeReverseOrderedLong(122l)));
assertEquals(2, store.getEntities("type_1", null, null, null, null, null,
null).getEntities().size());
assertEquals(1, store.getEntities("type_2", null, null, null, null, null,
null).getEntities().size());
assertEquals(true, deleteNextEntity(entityType1,
writeReverseOrderedLong(123l)));
List<TimelineEntity> entities =
store.getEntities("type_2", null, null, null, null, null,
EnumSet.allOf(Field.class)).getEntities();
assertEquals(1, entities.size());
verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap(
entityType1, Collections.singleton(entityId1b)), EMPTY_PRIMARY_FILTERS,
EMPTY_MAP, entities.get(0));
entities = store.getEntities("type_1", null, null, null, userFilter, null,
EnumSet.allOf(Field.class)).getEntities();
assertEquals(1, entities.size());
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(0));
((LeveldbTimelineStore)store).discardOldEntities(-123l);
assertEquals(1, store.getEntities("type_1", null, null, null, null, null,
null).getEntities().size());
assertEquals(0, store.getEntities("type_2", null, null, null, null, null,
null).getEntities().size());
assertEquals(1, ((LeveldbTimelineStore)store).getEntityTypes().size());
((LeveldbTimelineStore)store).discardOldEntities(123l);
assertEquals(0, store.getEntities("type_1", null, null, null, null, null,
null).getEntities().size());
assertEquals(0, store.getEntities("type_2", null, null, null, null, null,
null).getEntities().size());
assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
assertEquals(0, store.getEntities("type_1", null, null, null, userFilter,
null, null).getEntities().size());
}
@Test
public void testDeleteEntitiesPrimaryFilters()
throws IOException, InterruptedException {
Map<String, Set<Object>> primaryFilter =
Collections.singletonMap("user", Collections.singleton(
(Object) "otheruser"));
TimelineEntities atsEntities = new TimelineEntities();
atsEntities.setEntities(Collections.singletonList(createEntity(entityId1b,
entityType1, 789l, Collections.singletonList(ev2), null, primaryFilter,
null)));
TimelinePutResponse response = store.put(atsEntities);
assertEquals(0, response.getErrors().size());
NameValuePair pfPair = new NameValuePair("user", "otheruser");
List<TimelineEntity> entities = store.getEntities("type_1", null, null,
null, pfPair, null, null).getEntities();
assertEquals(1, entities.size());
verifyEntityInfo(entityId1b, entityType1, Collections.singletonList(ev2),
EMPTY_REL_ENTITIES, primaryFilter, EMPTY_MAP, entities.get(0));
entities = store.getEntities("type_1", null, null, null,
userFilter, null, null).getEntities();
assertEquals(2, entities.size());
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(0));
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(1));
((LeveldbTimelineStore)store).discardOldEntities(-123l);
assertEquals(1, store.getEntities("type_1", null, null, null, pfPair, null,
null).getEntities().size());
assertEquals(2, store.getEntities("type_1", null, null, null, userFilter,
null, null).getEntities().size());
((LeveldbTimelineStore)store).discardOldEntities(123l);
assertEquals(0, store.getEntities("type_1", null, null, null, null, null,
null).getEntities().size());
assertEquals(0, store.getEntities("type_2", null, null, null, null, null,
null).getEntities().size());
assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
assertEquals(0, store.getEntities("type_1", null, null, null, pfPair, null,
null).getEntities().size());
assertEquals(0, store.getEntities("type_1", null, null, null, userFilter,
null, null).getEntities().size());
}
}