YARN-1717. Enabled periodically discarding old data in LeveldbTimelineStore. Contributed by Billie Rinaldi.
svn merge --ignore-ancestry -c 1577693 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1577695 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fd118493c6
commit
f1947fcb28
|
@ -140,6 +140,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
transferred containers from previous app-attempts to new AMs after YARN-1490.
|
transferred containers from previous app-attempts to new AMs after YARN-1490.
|
||||||
(Jian He via vinodkv)
|
(Jian He via vinodkv)
|
||||||
|
|
||||||
|
YARN-1717. Enabled periodically discarding old data in LeveldbTimelineStore.
|
||||||
|
(Billie Rinaldi via zjshen)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via
|
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via
|
||||||
|
|
|
@ -1100,6 +1100,17 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String TIMELINE_SERVICE_STORE =
|
public static final String TIMELINE_SERVICE_STORE =
|
||||||
TIMELINE_SERVICE_PREFIX + "store-class";
|
TIMELINE_SERVICE_PREFIX + "store-class";
|
||||||
|
|
||||||
|
/** Timeline service enable data age off */
|
||||||
|
public static final String TIMELINE_SERVICE_TTL_ENABLE =
|
||||||
|
TIMELINE_SERVICE_PREFIX + "ttl-enable";
|
||||||
|
|
||||||
|
/** Timeline service length of time to retain data */
|
||||||
|
public static final String TIMELINE_SERVICE_TTL_MS =
|
||||||
|
TIMELINE_SERVICE_PREFIX + "ttl-ms";
|
||||||
|
|
||||||
|
public static final long DEFAULT_TIMELINE_SERVICE_TTL_MS =
|
||||||
|
1000 * 60 * 60 * 24 * 7;
|
||||||
|
|
||||||
public static final String TIMELINE_SERVICE_LEVELDB_PREFIX =
|
public static final String TIMELINE_SERVICE_LEVELDB_PREFIX =
|
||||||
TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.";
|
TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.";
|
||||||
|
|
||||||
|
@ -1107,16 +1118,36 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String TIMELINE_SERVICE_LEVELDB_PATH =
|
public static final String TIMELINE_SERVICE_LEVELDB_PATH =
|
||||||
TIMELINE_SERVICE_LEVELDB_PREFIX + "path";
|
TIMELINE_SERVICE_LEVELDB_PREFIX + "path";
|
||||||
|
|
||||||
|
/** Timeline service leveldb read cache (uncompressed blocks) */
|
||||||
|
public static final String TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE =
|
||||||
|
TIMELINE_SERVICE_LEVELDB_PREFIX + "read-cache-size";
|
||||||
|
|
||||||
|
public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE =
|
||||||
|
100 * 1024 * 1024;
|
||||||
|
|
||||||
/** Timeline service leveldb start time read cache (number of entities) */
|
/** Timeline service leveldb start time read cache (number of entities) */
|
||||||
public static final String
|
public static final String
|
||||||
TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE =
|
TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE =
|
||||||
TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-read-cache-size";
|
TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-read-cache-size";
|
||||||
|
|
||||||
|
public static final int
|
||||||
|
DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE = 10000;
|
||||||
|
|
||||||
/** Timeline service leveldb start time write cache (number of entities) */
|
/** Timeline service leveldb start time write cache (number of entities) */
|
||||||
public static final String
|
public static final String
|
||||||
TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE =
|
TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE =
|
||||||
TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-write-cache-size";
|
TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-write-cache-size";
|
||||||
|
|
||||||
|
public static final int
|
||||||
|
DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE = 10000;
|
||||||
|
|
||||||
|
/** Timeline service leveldb interval to wait between deletion rounds */
|
||||||
|
public static final String TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS =
|
||||||
|
TIMELINE_SERVICE_LEVELDB_PREFIX + "ttl-interval-ms";
|
||||||
|
|
||||||
|
public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS =
|
||||||
|
1000 * 60 * 5;
|
||||||
|
|
||||||
////////////////////////////////
|
////////////////////////////////
|
||||||
// Other Configs
|
// Other Configs
|
||||||
////////////////////////////////
|
////////////////////////////////
|
||||||
|
|
|
@ -1107,23 +1107,59 @@
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>The https adddress of the timeline service web application.</description>
|
<description>The https address of the timeline service web application.</description>
|
||||||
<name>yarn.timeline-service.webapp.https.address</name>
|
<name>yarn.timeline-service.webapp.https.address</name>
|
||||||
<value>${yarn.timeline-service.hostname}:8190</value>
|
<value>${yarn.timeline-service.hostname}:8190</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Store class name for timeline store</description>
|
<description>Store class name for timeline store.</description>
|
||||||
<name>yarn.timeline-service.store-class</name>
|
<name>yarn.timeline-service.store-class</name>
|
||||||
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore</value>
|
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Store file name for leveldb timeline store</description>
|
<description>Enable age off of timeline store data.</description>
|
||||||
|
<name>yarn.timeline-service.ttl-enable</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Time to live for timeline store data in milliseconds.</description>
|
||||||
|
<name>yarn.timeline-service.ttl-ms</name>
|
||||||
|
<value>604800000</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Store file name for leveldb timeline store.</description>
|
||||||
<name>yarn.timeline-service.leveldb-timeline-store.path</name>
|
<name>yarn.timeline-service.leveldb-timeline-store.path</name>
|
||||||
<value>${yarn.log.dir}/timeline</value>
|
<value>${yarn.log.dir}/timeline</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Length of time to wait between deletion cycles of leveldb timeline store in milliseconds.</description>
|
||||||
|
<name>yarn.timeline-service.leveldb-timeline-store.ttl-interval-ms</name>
|
||||||
|
<value>300000</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Size of read cache for uncompressed blocks for leveldb timeline store in bytes.</description>
|
||||||
|
<name>yarn.timeline-service.leveldb-timeline-store.read-cache-size</name>
|
||||||
|
<value>104857600</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Size of cache for recently read entity start times for leveldb timeline store in number of entities.</description>
|
||||||
|
<name>yarn.timeline-service.leveldb-timeline-store.start-time-read-cache-size</name>
|
||||||
|
<value>10000</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Size of cache for recently written entity start times for leveldb timeline store in number of entities.</description>
|
||||||
|
<name>yarn.timeline-service.leveldb-timeline-store.start-time-write-cache-size</name>
|
||||||
|
<value>10000</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Handler thread count to serve the client RPC requests.</description>
|
<description>Handler thread count to serve the client RPC requests.</description>
|
||||||
<name>yarn.timeline-service.handler-thread-count</name>
|
<name>yarn.timeline-service.handler-thread-count</name>
|
||||||
|
|
|
@ -56,7 +56,7 @@ public class GenericObjectMapper {
|
||||||
*
|
*
|
||||||
* @param o An Object
|
* @param o An Object
|
||||||
* @return A byte array representation of the Object
|
* @return A byte array representation of the Object
|
||||||
* @throws IOException
|
* @throws IOException if there is a write error
|
||||||
*/
|
*/
|
||||||
public static byte[] write(Object o) throws IOException {
|
public static byte[] write(Object o) throws IOException {
|
||||||
if (o == null) {
|
if (o == null) {
|
||||||
|
@ -71,7 +71,7 @@ public class GenericObjectMapper {
|
||||||
*
|
*
|
||||||
* @param b A byte array
|
* @param b A byte array
|
||||||
* @return An Object
|
* @return An Object
|
||||||
* @throws IOException
|
* @throws IOException if there is a read error
|
||||||
*/
|
*/
|
||||||
public static Object read(byte[] b) throws IOException {
|
public static Object read(byte[] b) throws IOException {
|
||||||
return read(b, 0);
|
return read(b, 0);
|
||||||
|
@ -84,7 +84,7 @@ public class GenericObjectMapper {
|
||||||
* @param b A byte array
|
* @param b A byte array
|
||||||
* @param offset Offset into the array
|
* @param offset Offset into the array
|
||||||
* @return An Object
|
* @return An Object
|
||||||
* @throws IOException
|
* @throws IOException if there is a read error
|
||||||
*/
|
*/
|
||||||
public static Object read(byte[] b, int offset) throws IOException {
|
public static Object read(byte[] b, int offset) throws IOException {
|
||||||
if (b == null || b.length == 0) {
|
if (b == null || b.length == 0) {
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -259,7 +259,9 @@ public class TimelineWebServices {
|
||||||
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
|
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.info("Storing entities: " + CSV_JOINER.join(entityIDs));
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
|
||||||
|
}
|
||||||
return store.put(entities);
|
return store.put(entities);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Error putting entities", e);
|
LOG.error("Error putting entities", e);
|
||||||
|
|
|
@ -19,17 +19,29 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
|
||||||
|
import org.iq80.leveldb.DBIterator;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@ -48,6 +60,7 @@ public class TestLeveldbTimelineStore
|
||||||
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
|
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
|
||||||
conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
|
||||||
fsPath.getAbsolutePath());
|
fsPath.getAbsolutePath());
|
||||||
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
|
||||||
store = new LeveldbTimelineStore();
|
store = new LeveldbTimelineStore();
|
||||||
store.init(conf);
|
store.init(conf);
|
||||||
store.start();
|
store.start();
|
||||||
|
@ -105,4 +118,119 @@ public class TestLeveldbTimelineStore
|
||||||
assertEquals(10002, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf));
|
assertEquals(10002, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean deleteNextEntity(String entityType, byte[] ts)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
DBIterator iterator = null;
|
||||||
|
DBIterator pfIterator = null;
|
||||||
|
try {
|
||||||
|
iterator = ((LeveldbTimelineStore)store).getDbIterator(false);
|
||||||
|
pfIterator = ((LeveldbTimelineStore)store).getDbIterator(false);
|
||||||
|
return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts,
|
||||||
|
iterator, pfIterator, false);
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(null, iterator, pfIterator);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetEntityTypes() throws IOException {
|
||||||
|
List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes();
|
||||||
|
assertEquals(2, entityTypes.size());
|
||||||
|
assertEquals(entityType1, entityTypes.get(0));
|
||||||
|
assertEquals(entityType2, entityTypes.get(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteEntities() throws IOException, InterruptedException {
|
||||||
|
assertEquals(2, store.getEntities("type_1", null, null, null, null, null,
|
||||||
|
null).getEntities().size());
|
||||||
|
assertEquals(1, store.getEntities("type_2", null, null, null, null, null,
|
||||||
|
null).getEntities().size());
|
||||||
|
|
||||||
|
assertEquals(false, deleteNextEntity(entityType1,
|
||||||
|
writeReverseOrderedLong(122l)));
|
||||||
|
assertEquals(2, store.getEntities("type_1", null, null, null, null, null,
|
||||||
|
null).getEntities().size());
|
||||||
|
assertEquals(1, store.getEntities("type_2", null, null, null, null, null,
|
||||||
|
null).getEntities().size());
|
||||||
|
|
||||||
|
assertEquals(true, deleteNextEntity(entityType1,
|
||||||
|
writeReverseOrderedLong(123l)));
|
||||||
|
List<TimelineEntity> entities =
|
||||||
|
store.getEntities("type_2", null, null, null, null, null,
|
||||||
|
EnumSet.allOf(Field.class)).getEntities();
|
||||||
|
assertEquals(1, entities.size());
|
||||||
|
verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap(
|
||||||
|
entityType1, Collections.singleton(entityId1b)), EMPTY_PRIMARY_FILTERS,
|
||||||
|
EMPTY_MAP, entities.get(0));
|
||||||
|
entities = store.getEntities("type_1", null, null, null, userFilter, null,
|
||||||
|
EnumSet.allOf(Field.class)).getEntities();
|
||||||
|
assertEquals(1, entities.size());
|
||||||
|
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||||
|
primaryFilters, otherInfo, entities.get(0));
|
||||||
|
|
||||||
|
((LeveldbTimelineStore)store).discardOldEntities(-123l);
|
||||||
|
assertEquals(1, store.getEntities("type_1", null, null, null, null, null,
|
||||||
|
null).getEntities().size());
|
||||||
|
assertEquals(0, store.getEntities("type_2", null, null, null, null, null,
|
||||||
|
null).getEntities().size());
|
||||||
|
assertEquals(1, ((LeveldbTimelineStore)store).getEntityTypes().size());
|
||||||
|
|
||||||
|
((LeveldbTimelineStore)store).discardOldEntities(123l);
|
||||||
|
assertEquals(0, store.getEntities("type_1", null, null, null, null, null,
|
||||||
|
null).getEntities().size());
|
||||||
|
assertEquals(0, store.getEntities("type_2", null, null, null, null, null,
|
||||||
|
null).getEntities().size());
|
||||||
|
assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
|
||||||
|
assertEquals(0, store.getEntities("type_1", null, null, null, userFilter,
|
||||||
|
null, null).getEntities().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteEntitiesPrimaryFilters()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
Map<String, Set<Object>> primaryFilter =
|
||||||
|
Collections.singletonMap("user", Collections.singleton(
|
||||||
|
(Object) "otheruser"));
|
||||||
|
TimelineEntities atsEntities = new TimelineEntities();
|
||||||
|
atsEntities.setEntities(Collections.singletonList(createEntity(entityId1b,
|
||||||
|
entityType1, 789l, Collections.singletonList(ev2), null, primaryFilter,
|
||||||
|
null)));
|
||||||
|
TimelinePutResponse response = store.put(atsEntities);
|
||||||
|
assertEquals(0, response.getErrors().size());
|
||||||
|
|
||||||
|
NameValuePair pfPair = new NameValuePair("user", "otheruser");
|
||||||
|
List<TimelineEntity> entities = store.getEntities("type_1", null, null,
|
||||||
|
null, pfPair, null, null).getEntities();
|
||||||
|
assertEquals(1, entities.size());
|
||||||
|
verifyEntityInfo(entityId1b, entityType1, Collections.singletonList(ev2),
|
||||||
|
EMPTY_REL_ENTITIES, primaryFilter, EMPTY_MAP, entities.get(0));
|
||||||
|
|
||||||
|
entities = store.getEntities("type_1", null, null, null,
|
||||||
|
userFilter, null, null).getEntities();
|
||||||
|
assertEquals(2, entities.size());
|
||||||
|
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||||
|
primaryFilters, otherInfo, entities.get(0));
|
||||||
|
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
|
||||||
|
primaryFilters, otherInfo, entities.get(1));
|
||||||
|
|
||||||
|
((LeveldbTimelineStore)store).discardOldEntities(-123l);
|
||||||
|
assertEquals(1, store.getEntities("type_1", null, null, null, pfPair, null,
|
||||||
|
null).getEntities().size());
|
||||||
|
assertEquals(2, store.getEntities("type_1", null, null, null, userFilter,
|
||||||
|
null, null).getEntities().size());
|
||||||
|
|
||||||
|
((LeveldbTimelineStore)store).discardOldEntities(123l);
|
||||||
|
assertEquals(0, store.getEntities("type_1", null, null, null, null, null,
|
||||||
|
null).getEntities().size());
|
||||||
|
assertEquals(0, store.getEntities("type_2", null, null, null, null, null,
|
||||||
|
null).getEntities().size());
|
||||||
|
assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
|
||||||
|
|
||||||
|
assertEquals(0, store.getEntities("type_1", null, null, null, pfPair, null,
|
||||||
|
null).getEntities().size());
|
||||||
|
assertEquals(0, store.getEntities("type_1", null, null, null, userFilter,
|
||||||
|
null, null).getEntities().size());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue