YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation. Contributed by Jonathan Eagles.

This commit is contained in:
Zhijie Shen 2015-05-07 10:01:51 -07:00
parent 8e991f4b1d
commit daf3e4ef8b
12 changed files with 2907 additions and 66 deletions

View File

@ -108,6 +108,9 @@ Release 2.8.0 - UNRELEASED
YARN-2619. Added NodeManager support for disk io isolation through cgroups.
(Varun Vasudev and Wei Yan via vinodkv)
YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation.
(Jonathan Eagles via zjshen)
IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA

View File

@ -129,6 +129,12 @@ public class TimelinePutResponse {
*/
public static final int FORBIDDEN_RELATION = 6;
/**
* Error code returned if the entity start time is before the eviction
* period of old data.
*/
public static final int EXPIRED_ENTITY = 7;
private String entityId;
private String entityType;
private int errorCode;

View File

@ -1431,6 +1431,18 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_TIMELINE_SERVICE_TTL_MS =
1000 * 60 * 60 * 24 * 7;
/** Timeline service rolling period. Valid values are daily, half_daily,
* quarter_daily, and hourly. */
public static final String TIMELINE_SERVICE_ROLLING_PERIOD =
TIMELINE_SERVICE_PREFIX + "rolling-period";
/** Roll a new database each hour. */
public static final String DEFAULT_TIMELINE_SERVICE_ROLLING_PERIOD =
"hourly";
/** Implementation specific configuration prefix for Timeline Service
* leveldb.
*/
public static final String TIMELINE_SERVICE_LEVELDB_PREFIX =
TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.";
@ -1438,13 +1450,36 @@ public class YarnConfiguration extends Configuration {
public static final String TIMELINE_SERVICE_LEVELDB_PATH =
TIMELINE_SERVICE_LEVELDB_PREFIX + "path";
/** Timeline service leveldb read cache (uncompressed blocks) */
/** Timeline service leveldb read cache (uncompressed blocks). This is
* per rolling instance so should be tuned if using rolling leveldb
* timeline store */
public static final String TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE =
TIMELINE_SERVICE_LEVELDB_PREFIX + "read-cache-size";
/** Default leveldb read cache size if no configuration is specified. */
public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE =
100 * 1024 * 1024;
/** Timeline service leveldb write buffer size. */
public static final String TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE =
TIMELINE_SERVICE_LEVELDB_PREFIX + "write-buffer-size";
/** Default leveldb write buffer size if no configuration is specified. This
* is per rolling instance so should be tuned if using rolling leveldb
* timeline store. */
public static final int DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE =
16 * 1024 * 1024;
/** Timeline service leveldb write batch size. This value can be tuned down
* to reduce lock time for ttl eviction. */
public static final String
TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE =
TIMELINE_SERVICE_LEVELDB_PREFIX + "write-batch-size";
/** Default leveldb write batch size is no configuration is specified */
public static final int
DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE = 10000;
/** Timeline service leveldb start time read cache (number of entities) */
public static final String
TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE =
@ -1468,6 +1503,16 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS =
1000 * 60 * 5;
/** Timeline service leveldb number of concurrent open files. Tuned this
* configuration to stay within system limits. This is per rolling instance
* so should be tuned if using rolling leveldb timeline store. */
public static final String TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES =
TIMELINE_SERVICE_LEVELDB_PREFIX + "max-open-files";
/** Default leveldb max open files if no configuration is specified. */
public static final int DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES =
1000;
/** The Kerberos principal for the timeline server.*/
public static final String TIMELINE_SERVICE_PRINCIPAL =
TIMELINE_SERVICE_PREFIX + "principal";

View File

@ -180,6 +180,11 @@
<artifactId>bcprov-jdk16</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>2.24</version>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,420 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
import java.util.TreeMap;
import java.util.Map.Entry;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
/**
* Contains the logic to lookup a leveldb by timestamp so that multiple smaller
* databases can roll according to the configured period and evicted efficiently
* via operating system directory removal.
*/
class RollingLevelDB {
/** Logger for this class. */
private static final Log LOG = LogFactory.getLog(RollingLevelDB.class);
/** Factory to open and create new leveldb instances. */
private static JniDBFactory factory = new JniDBFactory();
/** Thread safe date formatter. */
private FastDateFormat fdf;
/** Date parser. */
private SimpleDateFormat sdf;
/** Calendar to calculate the current and next rolling period. */
private GregorianCalendar cal = new GregorianCalendar(
TimeZone.getTimeZone("GMT"));
/** Collection of all active rolling leveldb instances. */
private final TreeMap<Long, DB> rollingdbs;
/** Collection of all rolling leveldb instances to evict. */
private final TreeMap<Long, DB> rollingdbsToEvict;
/** Name of this rolling level db. */
private final String name;
/** Calculated timestamp of when to roll a new leveldb instance. */
private volatile long nextRollingCheckMillis = 0;
/** File system instance to find and create new leveldb instances. */
private FileSystem lfs = null;
/** Directory to store rolling leveldb instances. */
private Path rollingDBPath;
/** Configuration for this object. */
private Configuration conf;
/** Rolling period. */
private RollingPeriod rollingPeriod;
/**
* Rolling leveldb instances are evicted when their endtime is earlier than
* the current time minus the time to live value.
*/
private long ttl;
/** Whether time to live is enabled. */
private boolean ttlEnabled;
/** Encapsulates the rolling period to date format lookup. */
enum RollingPeriod {
DAILY {
@Override
public String dateFormat() {
return "yyyy-MM-dd";
}
},
HALF_DAILY {
@Override
public String dateFormat() {
return "yyyy-MM-dd-HH";
}
},
QUARTER_DAILY {
@Override
public String dateFormat() {
return "yyyy-MM-dd-HH";
}
},
HOURLY {
@Override
public String dateFormat() {
return "yyyy-MM-dd-HH";
}
},
MINUTELY {
@Override
public String dateFormat() {
return "yyyy-MM-dd-HH-mm";
}
};
public abstract String dateFormat();
}
/**
* Convenience class for associating a write batch with its rolling leveldb
* instance.
*/
public static class RollingWriteBatch {
/** Leveldb object. */
private final DB db;
/** Write batch for the db object. */
private final WriteBatch writeBatch;
public RollingWriteBatch(final DB db, final WriteBatch writeBatch) {
this.db = db;
this.writeBatch = writeBatch;
}
public DB getDB() {
return db;
}
public WriteBatch getWriteBatch() {
return writeBatch;
}
public void write() {
db.write(writeBatch);
}
public void close() {
IOUtils.cleanup(LOG, writeBatch);
}
}
RollingLevelDB(String name) {
this.name = name;
this.rollingdbs = new TreeMap<Long, DB>();
this.rollingdbsToEvict = new TreeMap<Long, DB>();
}
protected String getName() {
return name;
}
protected long currentTimeMillis() {
return System.currentTimeMillis();
}
public long getNextRollingTimeMillis() {
return nextRollingCheckMillis;
}
public long getTimeToLive() {
return ttl;
}
public boolean getTimeToLiveEnabled() {
return ttlEnabled;
}
protected void setNextRollingTimeMillis(final long timestamp) {
this.nextRollingCheckMillis = timestamp;
LOG.info("Next rolling time for " + getName() + " is "
+ fdf.format(nextRollingCheckMillis));
}
public void init(final Configuration config) throws Exception {
LOG.info("Initializing RollingLevelDB for " + getName());
this.conf = config;
this.ttl = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS);
this.ttlEnabled = conf.getBoolean(
YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true);
this.rollingDBPath = new Path(
conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH),
RollingLevelDBTimelineStore.FILENAME);
initFileSystem();
initRollingPeriod();
initHistoricalDBs();
}
protected void initFileSystem() throws IOException {
lfs = FileSystem.getLocal(conf);
boolean success = lfs.mkdirs(rollingDBPath,
RollingLevelDBTimelineStore.LEVELDB_DIR_UMASK);
if (!success) {
throw new IOException("Failed to create leveldb root directory "
+ rollingDBPath);
}
}
protected synchronized void initRollingPeriod() {
final String lcRollingPeriod = conf.get(
YarnConfiguration.TIMELINE_SERVICE_ROLLING_PERIOD,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ROLLING_PERIOD);
this.rollingPeriod = RollingPeriod.valueOf(lcRollingPeriod
.toUpperCase(Locale.ENGLISH));
fdf = FastDateFormat.getInstance(rollingPeriod.dateFormat(),
TimeZone.getTimeZone("GMT"));
sdf = new SimpleDateFormat(rollingPeriod.dateFormat());
sdf.setTimeZone(fdf.getTimeZone());
}
protected synchronized void initHistoricalDBs() throws IOException {
Path rollingDBGlobPath = new Path(rollingDBPath, getName() + ".*");
FileStatus[] statuses = lfs.globStatus(rollingDBGlobPath);
for (FileStatus status : statuses) {
String dbName = FilenameUtils.getExtension(status.getPath().toString());
try {
Long dbStartTime = sdf.parse(dbName).getTime();
initRollingLevelDB(dbStartTime, status.getPath());
} catch (ParseException pe) {
LOG.warn("Failed to initialize rolling leveldb " + dbName + " for "
+ getName());
}
}
}
private void initRollingLevelDB(Long dbStartTime,
Path rollingInstanceDBPath) {
if (rollingdbs.containsKey(dbStartTime)) {
return;
}
Options options = new Options();
options.createIfMissing(true);
options.cacheSize(conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
options.maxOpenFiles(conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES));
options.writeBufferSize(conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
LOG.info("Initializing rolling leveldb instance :" + rollingInstanceDBPath
+ " for start time: " + dbStartTime);
DB db = null;
try {
db = factory.open(
new File(rollingInstanceDBPath.toUri().getPath()), options);
rollingdbs.put(dbStartTime, db);
String dbName = fdf.format(dbStartTime);
LOG.info("Added rolling leveldb instance " + dbName + " to " + getName());
} catch (IOException ioe) {
LOG.warn("Failed to open rolling leveldb instance :"
+ new File(rollingInstanceDBPath.toUri().getPath()), ioe);
}
}
synchronized DB getPreviousDB(DB db) {
Iterator<DB> iterator = rollingdbs.values().iterator();
DB prev = null;
while (iterator.hasNext()) {
DB cur = iterator.next();
if (cur == db) {
break;
}
prev = cur;
}
return prev;
}
synchronized long getStartTimeFor(DB db) {
long startTime = -1;
for (Map.Entry<Long, DB> entry : rollingdbs.entrySet()) {
if (entry.getValue() == db) {
startTime = entry.getKey();
}
}
return startTime;
}
public synchronized DB getDBForStartTime(long startTime) {
// make sure we sanitize this input
startTime = Math.min(startTime, currentTimeMillis());
if (startTime >= getNextRollingTimeMillis()) {
roll(startTime);
}
Entry<Long, DB> entry = rollingdbs.floorEntry(startTime);
if (entry == null) {
return null;
}
return entry.getValue();
}
private void roll(long startTime) {
LOG.info("Rolling new DB instance for " + getName());
long currentStartTime = computeCurrentCheckMillis(startTime);
setNextRollingTimeMillis(computeNextCheckMillis(currentStartTime));
String currentRollingDBInstance = fdf.format(currentStartTime);
String currentRollingDBName = getName() + "." + currentRollingDBInstance;
Path currentRollingDBPath = new Path(rollingDBPath, currentRollingDBName);
if (getTimeToLiveEnabled()) {
scheduleOldDBsForEviction();
}
initRollingLevelDB(currentStartTime, currentRollingDBPath);
}
private synchronized void scheduleOldDBsForEviction() {
// keep at least time to live amount of data
long evictionThreshold = computeCurrentCheckMillis(currentTimeMillis()
- getTimeToLive());
LOG.info("Scheduling " + getName() + " DBs older than "
+ fdf.format(evictionThreshold) + " for eviction");
Iterator<Entry<Long, DB>> iterator = rollingdbs.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Long, DB> entry = iterator.next();
// parse this in gmt time
if (entry.getKey() < evictionThreshold) {
LOG.info("Scheduling " + getName() + " eviction for "
+ fdf.format(entry.getKey()));
iterator.remove();
rollingdbsToEvict.put(entry.getKey(), entry.getValue());
}
}
}
public synchronized void evictOldDBs() {
LOG.info("Evicting " + getName() + " DBs scheduled for eviction");
Iterator<Entry<Long, DB>> iterator = rollingdbsToEvict.entrySet()
.iterator();
while (iterator.hasNext()) {
Entry<Long, DB> entry = iterator.next();
IOUtils.cleanup(LOG, entry.getValue());
String dbName = fdf.format(entry.getKey());
Path path = new Path(rollingDBPath, getName() + "." + dbName);
try {
LOG.info("Removing old db directory contents in " + path);
lfs.delete(path, true);
} catch (IOException ioe) {
LOG.warn("Failed to evict old db " + path, ioe);
}
iterator.remove();
}
}
public void stop() throws Exception {
for (DB db : rollingdbs.values()) {
IOUtils.cleanup(LOG, db);
}
IOUtils.cleanup(LOG, lfs);
}
private long computeNextCheckMillis(long now) {
return computeCheckMillis(now, true);
}
public long computeCurrentCheckMillis(long now) {
return computeCheckMillis(now, false);
}
private synchronized long computeCheckMillis(long now, boolean next) {
// needs to be called synchronously due to shared Calendar
cal.setTimeInMillis(now);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
if (rollingPeriod == RollingPeriod.DAILY) {
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
if (next) {
cal.add(Calendar.DATE, 1);
}
} else if (rollingPeriod == RollingPeriod.HALF_DAILY) {
// round down to 12 hour interval
int hour = (cal.get(Calendar.HOUR) / 12) * 12;
cal.set(Calendar.HOUR, hour);
cal.set(Calendar.MINUTE, 0);
if (next) {
cal.add(Calendar.HOUR_OF_DAY, 12);
}
} else if (rollingPeriod == RollingPeriod.QUARTER_DAILY) {
// round down to 6 hour interval
int hour = (cal.get(Calendar.HOUR) / 6) * 6;
cal.set(Calendar.HOUR, hour);
cal.set(Calendar.MINUTE, 0);
if (next) {
cal.add(Calendar.HOUR_OF_DAY, 6);
}
} else if (rollingPeriod == RollingPeriod.HOURLY) {
cal.set(Calendar.MINUTE, 0);
if (next) {
cal.add(Calendar.HOUR_OF_DAY, 1);
}
} else if (rollingPeriod == RollingPeriod.MINUTELY) {
// round down to 5 minute interval
int minute = (cal.get(Calendar.MINUTE) / 5) * 5;
cal.set(Calendar.MINUTE, minute);
if (next) {
cal.add(Calendar.MINUTE, 5);
}
}
return cal.getTimeInMillis();
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.timeline;
import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@ -43,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
@ -51,7 +48,7 @@ import com.google.common.annotations.VisibleForTesting;
* The class wrap over the timeline store and the ACLs manager. It does some non
* trivial manipulation of the timeline data before putting or after getting it
* from the timeline store, and checks the user's access to it.
*
*
*/
public class TimelineDataManager extends AbstractService {
@ -119,7 +116,7 @@ public class TimelineDataManager extends AbstractService {
* Get the timeline entities that the given user have access to. The meaning
* of each argument has been documented with
* {@link TimelineReader#getEntities}.
*
*
* @see TimelineReader#getEntities
*/
public TimelineEntities getEntities(
@ -156,7 +153,7 @@ public class TimelineDataManager extends AbstractService {
* Get the single timeline entity that the given user has access to. The
* meaning of each argument has been documented with
* {@link TimelineReader#getEntity}.
*
*
* @see TimelineReader#getEntity
*/
public TimelineEntity getEntity(
@ -182,7 +179,7 @@ public class TimelineDataManager extends AbstractService {
* Get the events whose entities the given user has access to. The meaning of
* each argument has been documented with
* {@link TimelineReader#getEntityTimelines}.
*
*
* @see TimelineReader#getEntityTimelines
*/
public TimelineEvents getEvents(
@ -218,7 +215,7 @@ public class TimelineDataManager extends AbstractService {
eventsItr.remove();
}
} catch (Exception e) {
LOG.error("Error when verifying access for user " + callerUGI
LOG.warn("Error when verifying access for user " + callerUGI
+ " on the events of the timeline entity "
+ new EntityIdentifier(eventsOfOneEntity.getEntityId(),
eventsOfOneEntity.getEntityType()), e);
@ -242,13 +239,10 @@ public class TimelineDataManager extends AbstractService {
if (entities == null) {
return new TimelinePutResponse();
}
List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>();
TimelineEntities entitiesToPut = new TimelineEntities();
List<TimelinePutResponse.TimelinePutError> errors =
new ArrayList<TimelinePutResponse.TimelinePutError>();
for (TimelineEntity entity : entities.getEntities()) {
EntityIdentifier entityID =
new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
// if the domain id is not specified, the entity will be put into
// the default domain
@ -261,44 +255,42 @@ public class TimelineDataManager extends AbstractService {
TimelineEntity existingEntity = null;
try {
existingEntity =
store.getEntity(entityID.getId(), entityID.getType(),
store.getEntity(entity.getEntityId(), entity.getEntityType(),
EnumSet.of(Field.PRIMARY_FILTERS));
if (existingEntity != null) {
addDefaultDomainIdIfAbsent(existingEntity);
if (!existingEntity.getDomainId().equals(entity.getDomainId())) {
throw new YarnException("The domain of the timeline entity "
+ entityID + " is not allowed to be changed.");
+ "{ id: " + entity.getEntityId() + ", type: "
+ entity.getEntityType() + " } is not allowed to be changed from "
+ existingEntity.getDomainId() + " to " + entity.getDomainId());
}
}
if (!timelineACLsManager.checkAccess(
callerUGI, ApplicationAccessType.MODIFY_APP, entity)) {
throw new YarnException(callerUGI
+ " is not allowed to put the timeline entity " + entityID
+ " into the domain " + entity.getDomainId() + ".");
+ " is not allowed to put the timeline entity "
+ "{ id: " + entity.getEntityId() + ", type: "
+ entity.getEntityType() + " } into the domain "
+ entity.getDomainId() + ".");
}
} catch (Exception e) {
// Skip the entity which already exists and was put by others
LOG.error("Skip the timeline entity: " + entityID, e);
LOG.warn("Skip the timeline entity: { id: " + entity.getEntityId()
+ ", type: "+ entity.getEntityType() + " }", e);
TimelinePutResponse.TimelinePutError error =
new TimelinePutResponse.TimelinePutError();
error.setEntityId(entityID.getId());
error.setEntityType(entityID.getType());
error.setEntityId(entity.getEntityId());
error.setEntityType(entity.getEntityType());
error.setErrorCode(
TimelinePutResponse.TimelinePutError.ACCESS_DENIED);
errors.add(error);
continue;
}
entityIDs.add(entityID);
entitiesToPut.addEntity(entity);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing the entity " + entityID + ", JSON-style content: "
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
}
TimelinePutResponse response = store.put(entitiesToPut);
// add the errors of timeline system filter key conflict
response.addErrors(errors);

View File

@ -21,15 +21,16 @@ package org.apache.hadoop.yarn.server.timeline.util;
import org.apache.hadoop.io.WritableComparator;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
public class LeveldbUtils {
/** A string builder utility for building timeline server leveldb keys. */
public static class KeyBuilder {
/** Maximum subkeys that can be added to construct a key. */
private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
private byte[][] b;
private boolean[] useSeparator;
@ -47,8 +48,15 @@ public class LeveldbUtils {
return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS);
}
/** Instantiate a new key build with the given maximum subkes.
* @param size maximum subkeys that can be added to this key builder
* @return a newly constructed key builder */
public static KeyBuilder newInstance(final int size) {
return new KeyBuilder(size);
}
public KeyBuilder add(String s) {
return add(s.getBytes(Charset.forName("UTF-8")), true);
return add(s.getBytes(UTF_8), true);
}
public KeyBuilder add(byte[] t) {
@ -66,26 +74,37 @@ public class LeveldbUtils {
return this;
}
public byte[] getBytes() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
/** Builds a byte array without the final string delimiter. */
public byte[] getBytes() {
// check the last valid entry to see the final length
int bytesLength = length;
if (useSeparator[index - 1]) {
bytesLength = length - 1;
}
byte[] bytes = new byte[bytesLength];
int curPos = 0;
for (int i = 0; i < index; i++) {
baos.write(b[i]);
System.arraycopy(b[i], 0, bytes, curPos, b[i].length);
curPos += b[i].length;
if (i < index - 1 && useSeparator[i]) {
baos.write(0x0);
bytes[curPos++] = 0x0;
}
}
return baos.toByteArray();
return bytes;
}
public byte[] getBytesForLookup() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
/** Builds a byte array including the final string delimiter. */
public byte[] getBytesForLookup() {
byte[] bytes = new byte[length];
int curPos = 0;
for (int i = 0; i < index; i++) {
baos.write(b[i]);
System.arraycopy(b[i], 0, bytes, curPos, b[i].length);
curPos += b[i].length;
if (useSeparator[i]) {
baos.write(0x0);
bytes[curPos++] = 0x0;
}
}
return baos.toByteArray();
return bytes;
}
}
@ -93,11 +112,12 @@ public class LeveldbUtils {
private final byte[] b;
private int offset;
public KeyParser(byte[] b, int offset) {
public KeyParser(final byte[] b, final int offset) {
this.b = b;
this.offset = offset;
}
/** Returns a string from the offset until the next string delimiter. */
public String getNextString() throws IOException {
if (offset >= b.length) {
throw new IOException(
@ -107,23 +127,42 @@ public class LeveldbUtils {
while (offset + i < b.length && b[offset + i] != 0x0) {
i++;
}
String s = new String(b, offset, i, Charset.forName("UTF-8"));
String s = new String(b, offset, i, UTF_8);
offset = offset + i + 1;
return s;
}
/** Moves current position until after the next end of string marker. */
public void skipNextString() throws IOException {
if (offset >= b.length) {
throw new IOException("tried to read nonexistent string from byte array");
}
while (offset < b.length && b[offset] != 0x0) {
++offset;
}
++offset;
}
/** Read the next 8 bytes in the byte buffer as a long. */
public long getNextLong() throws IOException {
if (offset + 8 >= b.length) {
throw new IOException("byte array ran out when trying to read long");
}
long l = readReverseOrderedLong(b, offset);
long value = readReverseOrderedLong(b, offset);
offset += 8;
return l;
return value;
}
public int getOffset() {
return offset;
}
/** Returns a copy of the remaining bytes. */
public byte[] getRemainingBytes() {
byte[] bytes = new byte[b.length - offset];
System.arraycopy(b, offset, bytes, 0, b.length - offset);
return bytes;
}
}
/**

View File

@ -43,8 +43,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.iq80.leveldb.DBException;
import org.junit.After;
@ -155,7 +153,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts,
iterator, pfIterator, false);
} catch(DBException e) {
throw new IOException(e);
throw new IOException(e);
} finally {
IOUtils.cleanup(null, iterator, pfIterator);
}
@ -179,12 +177,12 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
assertEquals(1, getEntities("type_2").size());
assertEquals(false, deleteNextEntity(entityType1,
writeReverseOrderedLong(60l)));
writeReverseOrderedLong(60L)));
assertEquals(3, getEntities("type_1").size());
assertEquals(1, getEntities("type_2").size());
assertEquals(true, deleteNextEntity(entityType1,
writeReverseOrderedLong(123l)));
writeReverseOrderedLong(123L)));
List<TimelineEntity> entities = getEntities("type_2");
assertEquals(1, entities.size());
verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap(
@ -198,12 +196,12 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(1), domainId2);
((LeveldbTimelineStore)store).discardOldEntities(-123l);
((LeveldbTimelineStore)store).discardOldEntities(0L);
assertEquals(2, getEntities("type_1").size());
assertEquals(0, getEntities("type_2").size());
assertEquals(6, ((LeveldbTimelineStore)store).getEntityTypes().size());
((LeveldbTimelineStore)store).discardOldEntities(123l);
((LeveldbTimelineStore)store).discardOldEntities(123L);
assertEquals(0, getEntities("type_1").size());
assertEquals(0, getEntities("type_2").size());
assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
@ -240,11 +238,11 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(2), domainId2);
((LeveldbTimelineStore)store).discardOldEntities(-123l);
((LeveldbTimelineStore)store).discardOldEntities(-123L);
assertEquals(1, getEntitiesWithPrimaryFilter("type_1", pfPair).size());
assertEquals(3, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
((LeveldbTimelineStore)store).discardOldEntities(123l);
((LeveldbTimelineStore)store).discardOldEntities(123L);
assertEquals(0, getEntities("type_1").size());
assertEquals(0, getEntities("type_2").size());
assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
@ -261,7 +259,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
assertEquals(1, getEntitiesFromTs("type_2", l).size());
assertEquals(3, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
l).size());
((LeveldbTimelineStore)store).discardOldEntities(123l);
((LeveldbTimelineStore)store).discardOldEntities(123L);
assertEquals(0, getEntitiesFromTs("type_1", l).size());
assertEquals(0, getEntitiesFromTs("type_2", l).size());
assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
@ -279,7 +277,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
assertEquals(1, getEntities("type_2").size());
assertEquals(3, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
}
@Test
public void testCheckVersion() throws IOException {
LeveldbTimelineStore dbStore = (LeveldbTimelineStore) store;
@ -299,16 +297,15 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
Assert.assertEquals(defaultVersion, dbStore.loadVersion());
// incompatible version
Version incompatibleVersion =
Version.newInstance(defaultVersion.getMajorVersion() + 1,
defaultVersion.getMinorVersion());
Version incompatibleVersion = Version.newInstance(
defaultVersion.getMajorVersion() + 1, defaultVersion.getMinorVersion());
dbStore.storeVersion(incompatibleVersion);
try {
restartTimelineStore();
Assert.fail("Incompatible version, should expect fail here.");
} catch (ServiceStateException e) {
Assert.assertTrue("Exception message mismatch",
e.getMessage().contains("Incompatible version for timeline store"));
Assert.assertTrue("Exception message mismatch",
e.getMessage().contains("Incompatible version for timeline store"));
}
}

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.iq80.leveldb.DB;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/** Test class for verification of RollingLevelDB. */
public class TestRollingLevelDB {
private Configuration conf = new YarnConfiguration();
private FileSystem lfs;
private MyRollingLevelDB rollingLevelDB;
/** RollingLevelDB for testing that has a setting current time. */
public static class MyRollingLevelDB extends RollingLevelDB {
private long currentTimeMillis;
MyRollingLevelDB() {
super("Test");
this.currentTimeMillis = System.currentTimeMillis();
}
@Override
protected long currentTimeMillis() {
return currentTimeMillis;
}
public void setCurrentTimeMillis(long time) {
this.currentTimeMillis = time;
}
};
@Before
public void setup() throws Exception {
lfs = FileSystem.getLocal(conf);
File fsPath = new File("target", this.getClass().getSimpleName() +
"-tmpDir").getAbsoluteFile();
conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
fsPath.getAbsolutePath());
lfs.delete(new Path(fsPath.getAbsolutePath()), true);
rollingLevelDB = new MyRollingLevelDB();
}
@Test
public void testInsertAfterRollPeriodRollsDB() throws Exception {
rollingLevelDB.init(conf);
long now = rollingLevelDB.currentTimeMillis();
DB db = rollingLevelDB.getDBForStartTime(now);
long startTime = rollingLevelDB.getStartTimeFor(db);
Assert.assertEquals("Received level db for incorrect start time",
rollingLevelDB.computeCurrentCheckMillis(now),
startTime);
now = rollingLevelDB.getNextRollingTimeMillis();
rollingLevelDB.setCurrentTimeMillis(now);
db = rollingLevelDB.getDBForStartTime(now);
startTime = rollingLevelDB.getStartTimeFor(db);
Assert.assertEquals("Received level db for incorrect start time",
rollingLevelDB.computeCurrentCheckMillis(now),
startTime);
}
@Test
public void testInsertForPreviousPeriodAfterRollPeriodRollsDB()
throws Exception {
rollingLevelDB.init(conf);
long now = rollingLevelDB.currentTimeMillis();
now = rollingLevelDB.computeCurrentCheckMillis(now);
rollingLevelDB.setCurrentTimeMillis(now);
DB db = rollingLevelDB.getDBForStartTime(now - 1);
long startTime = rollingLevelDB.getStartTimeFor(db);
Assert.assertEquals("Received level db for incorrect start time",
rollingLevelDB.computeCurrentCheckMillis(now - 1),
startTime);
}
}

View File

@ -0,0 +1,427 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mortbay.log.Log;
/** Test class to verify RollingLevelDBTimelineStore. */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TestRollingLevelDBTimelineStore extends TimelineStoreTestUtils {
private FileContext fsContext;
private File fsPath;
private Configuration config = new YarnConfiguration();
@Before
public void setup() throws Exception {
fsContext = FileContext.getLocalFSFileContext();
fsPath = new File("target", this.getClass().getSimpleName() +
"-tmpDir").getAbsoluteFile();
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
config.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
fsPath.getAbsolutePath());
config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
store = new RollingLevelDBTimelineStore();
store.init(config);
store.start();
loadTestEntityData();
loadVerificationEntityData();
loadTestDomainData();
}
@After
public void tearDown() throws Exception {
store.stop();
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
}
@Test
public void testRootDirPermission() throws IOException {
FileSystem fs = FileSystem.getLocal(new YarnConfiguration());
FileStatus file = fs.getFileStatus(new Path(fsPath.getAbsolutePath(),
RollingLevelDBTimelineStore.FILENAME));
assertNotNull(file);
assertEquals(RollingLevelDBTimelineStore.LEVELDB_DIR_UMASK,
file.getPermission());
}
@Test
public void testGetSingleEntity() throws IOException {
super.testGetSingleEntity();
((RollingLevelDBTimelineStore)store).clearStartTimeCache();
super.testGetSingleEntity();
loadTestEntityData();
}
@Test
public void testGetEntities() throws IOException {
super.testGetEntities();
}
@Test
public void testGetEntitiesWithFromId() throws IOException {
super.testGetEntitiesWithFromId();
}
@Test
public void testGetEntitiesWithFromTs() throws IOException {
// feature not supported
}
@Test
public void testGetEntitiesWithPrimaryFilters() throws IOException {
super.testGetEntitiesWithPrimaryFilters();
}
@Test
public void testGetEntitiesWithSecondaryFilters() throws IOException {
super.testGetEntitiesWithSecondaryFilters();
}
@Test
public void testGetEvents() throws IOException {
super.testGetEvents();
}
@Test
public void testCacheSizes() {
Configuration conf = new Configuration();
assertEquals(10000,
RollingLevelDBTimelineStore.getStartTimeReadCacheSize(conf));
assertEquals(10000,
RollingLevelDBTimelineStore.getStartTimeWriteCacheSize(conf));
conf.setInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
10001);
assertEquals(10001,
RollingLevelDBTimelineStore.getStartTimeReadCacheSize(conf));
conf = new Configuration();
conf.setInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
10002);
assertEquals(10002,
RollingLevelDBTimelineStore.getStartTimeWriteCacheSize(conf));
}
@Test
public void testCheckVersion() throws IOException {
RollingLevelDBTimelineStore dbStore = (RollingLevelDBTimelineStore) store;
// default version
Version defaultVersion = dbStore.getCurrentVersion();
Assert.assertEquals(defaultVersion, dbStore.loadVersion());
// compatible version
Version compatibleVersion =
Version.newInstance(defaultVersion.getMajorVersion(),
defaultVersion.getMinorVersion() + 2);
dbStore.storeVersion(compatibleVersion);
Assert.assertEquals(compatibleVersion, dbStore.loadVersion());
restartTimelineStore();
dbStore = (RollingLevelDBTimelineStore) store;
// overwrite the compatible version
Assert.assertEquals(defaultVersion, dbStore.loadVersion());
// incompatible version
Version incompatibleVersion =
Version.newInstance(defaultVersion.getMajorVersion() + 1,
defaultVersion.getMinorVersion());
dbStore.storeVersion(incompatibleVersion);
try {
restartTimelineStore();
Assert.fail("Incompatible version, should expect fail here.");
} catch (ServiceStateException e) {
Assert.assertTrue("Exception message mismatch",
e.getMessage().contains("Incompatible version for timeline store"));
}
}
@Test
public void testValidateConfig() throws IOException {
Configuration copyConfig = new YarnConfiguration(config);
try {
Configuration newConfig = new YarnConfiguration(copyConfig);
newConfig.setLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS, 0);
config = newConfig;
restartTimelineStore();
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
YarnConfiguration.TIMELINE_SERVICE_TTL_MS));
}
try {
Configuration newConfig = new YarnConfiguration(copyConfig);
newConfig.setLong(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, 0);
config = newConfig;
restartTimelineStore();
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS));
}
try {
Configuration newConfig = new YarnConfiguration(copyConfig);
newConfig.setLong(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, -1);
config = newConfig;
restartTimelineStore();
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
}
try {
Configuration newConfig = new YarnConfiguration(copyConfig);
newConfig.setLong(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
0);
config = newConfig;
restartTimelineStore();
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
YarnConfiguration
.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE));
}
try {
Configuration newConfig = new YarnConfiguration(copyConfig);
newConfig.setLong(
YarnConfiguration
.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
0);
config = newConfig;
restartTimelineStore();
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
YarnConfiguration
.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE));
}
config = copyConfig;
restartTimelineStore();
}
private void restartTimelineStore() throws IOException {
// need to close so leveldb releases database lock
if (store != null) {
store.close();
}
store = new RollingLevelDBTimelineStore();
store.init(config);
store.start();
}
@Test
public void testGetDomain() throws IOException {
super.testGetDomain();
}
@Test
public void testGetDomains() throws IOException {
super.testGetDomains();
}
@Test
public void testRelatingToNonExistingEntity() throws IOException {
TimelineEntity entityToStore = new TimelineEntity();
entityToStore.setEntityType("TEST_ENTITY_TYPE_1");
entityToStore.setEntityId("TEST_ENTITY_ID_1");
entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID);
entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2");
TimelineEntities entities = new TimelineEntities();
entities.addEntity(entityToStore);
store.put(entities);
TimelineEntity entityToGet =
store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null);
Assert.assertNotNull(entityToGet);
Assert.assertEquals("DEFAULT", entityToGet.getDomainId());
Assert.assertEquals("TEST_ENTITY_TYPE_1",
entityToGet.getRelatedEntities().keySet().iterator().next());
Assert.assertEquals("TEST_ENTITY_ID_1",
entityToGet.getRelatedEntities().values().iterator().next()
.iterator().next());
}
@Test
public void testRelatingToEntityInSamePut() throws IOException {
TimelineEntity entityToRelate = new TimelineEntity();
entityToRelate.setEntityType("TEST_ENTITY_TYPE_2");
entityToRelate.setEntityId("TEST_ENTITY_ID_2");
entityToRelate.setDomainId("TEST_DOMAIN");
TimelineEntity entityToStore = new TimelineEntity();
entityToStore.setEntityType("TEST_ENTITY_TYPE_1");
entityToStore.setEntityId("TEST_ENTITY_ID_1");
entityToStore.setDomainId("TEST_DOMAIN");
entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2");
TimelineEntities entities = new TimelineEntities();
entities.addEntity(entityToStore);
entities.addEntity(entityToRelate);
store.put(entities);
TimelineEntity entityToGet =
store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null);
Assert.assertNotNull(entityToGet);
Assert.assertEquals("TEST_DOMAIN", entityToGet.getDomainId());
Assert.assertEquals("TEST_ENTITY_TYPE_1",
entityToGet.getRelatedEntities().keySet().iterator().next());
Assert.assertEquals("TEST_ENTITY_ID_1",
entityToGet.getRelatedEntities().values().iterator().next()
.iterator().next());
}
@Test
public void testRelatingToOldEntityWithoutDomainId() throws IOException {
// New entity is put in the default domain
TimelineEntity entityToStore = new TimelineEntity();
entityToStore.setEntityType("NEW_ENTITY_TYPE_1");
entityToStore.setEntityId("NEW_ENTITY_ID_1");
entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID);
entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1");
TimelineEntities entities = new TimelineEntities();
entities.addEntity(entityToStore);
store.put(entities);
TimelineEntity entityToGet =
store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null);
Assert.assertNotNull(entityToGet);
Assert.assertEquals("DEFAULT", entityToGet.getDomainId());
Assert.assertEquals("NEW_ENTITY_TYPE_1",
entityToGet.getRelatedEntities().keySet().iterator().next());
Assert.assertEquals("NEW_ENTITY_ID_1",
entityToGet.getRelatedEntities().values().iterator().next()
.iterator().next());
// New entity is not put in the default domain
entityToStore = new TimelineEntity();
entityToStore.setEntityType("NEW_ENTITY_TYPE_2");
entityToStore.setEntityId("NEW_ENTITY_ID_2");
entityToStore.setDomainId("NON_DEFAULT");
entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1");
entities = new TimelineEntities();
entities.addEntity(entityToStore);
TimelinePutResponse response = store.put(entities);
Assert.assertEquals(1, response.getErrors().size());
Assert.assertEquals(TimelinePutError.FORBIDDEN_RELATION,
response.getErrors().get(0).getErrorCode());
entityToGet =
store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null);
Assert.assertNotNull(entityToGet);
Assert.assertEquals("DEFAULT", entityToGet.getDomainId());
// Still have one related entity
Assert.assertEquals(1, entityToGet.getRelatedEntities().keySet().size());
Assert.assertEquals(1, entityToGet.getRelatedEntities().values()
.iterator().next().size());
}
public void testStorePerformance() throws IOException {
TimelineEntity entityToStorePrep = new TimelineEntity();
entityToStorePrep.setEntityType("TEST_ENTITY_TYPE_PREP");
entityToStorePrep.setEntityId("TEST_ENTITY_ID_PREP");
entityToStorePrep.setDomainId("TEST_DOMAIN");
entityToStorePrep.addRelatedEntity("TEST_ENTITY_TYPE_2",
"TEST_ENTITY_ID_2");
entityToStorePrep.setStartTime(0L);
TimelineEntities entitiesPrep = new TimelineEntities();
entitiesPrep.addEntity(entityToStorePrep);
store.put(entitiesPrep);
long start = System.currentTimeMillis();
int num = 1000000;
Log.info("Start test for " + num);
final String tezTaskAttemptId = "TEZ_TA";
final String tezEntityId = "attempt_1429158534256_0001_1_00_000000_";
final String tezTaskId = "TEZ_T";
final String tezDomainId = "Tez_ATS_application_1429158534256_0001";
TimelineEntity entityToStore = new TimelineEntity();
TimelineEvent startEvt = new TimelineEvent();
entityToStore.setEntityType(tezTaskAttemptId);
startEvt.setEventType("TASK_ATTEMPT_STARTED");
startEvt.setTimestamp(0);
entityToStore.addEvent(startEvt);
entityToStore.setDomainId(tezDomainId);
entityToStore.addPrimaryFilter("status", "SUCCEEDED");
entityToStore.addPrimaryFilter("applicationId",
"application_1429158534256_0001");
entityToStore.addPrimaryFilter("TEZ_VERTEX_ID",
"vertex_1429158534256_0001_1_00");
entityToStore.addPrimaryFilter("TEZ_DAG_ID", "dag_1429158534256_0001_1");
entityToStore.addPrimaryFilter("TEZ_TASK_ID",
"task_1429158534256_0001_1_00_000000");
entityToStore.setStartTime(0L);
entityToStore.addOtherInfo("startTime", 0);
entityToStore.addOtherInfo("inProgressLogsURL",
"localhost:8042/inProgressLogsURL");
entityToStore.addOtherInfo("completedLogsURL", "");
entityToStore.addOtherInfo("nodeId", "localhost:54450");
entityToStore.addOtherInfo("nodeHttpAddress", "localhost:8042");
entityToStore.addOtherInfo("containerId",
"container_1429158534256_0001_01_000002");
entityToStore.addOtherInfo("status", "RUNNING");
entityToStore.addRelatedEntity(tezTaskId, "TEZ_TASK_ID_1");
TimelineEntities entities = new TimelineEntities();
entities.addEntity(entityToStore);
for (int i = 0; i < num; ++i) {
entityToStore.setEntityId(tezEntityId + i);
store.put(entities);
}
long duration = System.currentTimeMillis() - start;
Log.info("Duration for " + num + ": " + duration);
}
public static void main(String[] args) throws Exception {
TestRollingLevelDBTimelineStore store =
new TestRollingLevelDBTimelineStore();
store.setup();
store.testStorePerformance();
store.tearDown();
}
}

View File

@ -70,7 +70,7 @@ public class TimelineStoreTestUtils {
protected String entityId6;
protected String entityId7;
protected String entityType7;
protected Map<String, Set<Object>> primaryFilters;
protected Map<String, Object> secondaryFilters;
protected Map<String, Object> allFilters;
@ -105,7 +105,7 @@ public class TimelineStoreTestUtils {
Set<Object> l1 = new HashSet<Object>();
l1.add("username");
Set<Object> l2 = new HashSet<Object>();
l2.add((long)Integer.MAX_VALUE);
l2.add(Integer.MAX_VALUE);
Set<Object> l3 = new HashSet<Object>();
l3.add("123abc");
Set<Object> l4 = new HashSet<Object>();
@ -115,7 +115,7 @@ public class TimelineStoreTestUtils {
primaryFilters.put("other", l3);
primaryFilters.put("long", l4);
Map<String, Object> secondaryFilters = new HashMap<String, Object>();
secondaryFilters.put("startTime", 123456l);
secondaryFilters.put("startTime", 123456);
secondaryFilters.put("status", "RUNNING");
Map<String, Object> otherInfo1 = new HashMap<String, Object>();
otherInfo1.put("info1", "val1");
@ -139,7 +139,7 @@ public class TimelineStoreTestUtils {
relatedEntities.put(entityType2, Collections.singleton(entityId2));
TimelineEvent ev3 = createEvent(789l, "launch_event", null);
TimelineEvent ev4 = createEvent(-123l, "init_event", null);
TimelineEvent ev4 = createEvent(0l, "init_event", null);
List<TimelineEvent> events = new ArrayList<TimelineEvent>();
events.add(ev3);
events.add(ev4);
@ -302,7 +302,7 @@ public class TimelineStoreTestUtils {
relEntityMap2.put(entityType4, Collections.singleton(entityId4));
ev3 = createEvent(789l, "launch_event", null);
ev4 = createEvent(-123l, "init_event", null);
ev4 = createEvent(0l, "init_event", null);
events2 = new ArrayList<TimelineEvent>();
events2.add(ev3);
events2.add(ev4);
@ -384,7 +384,7 @@ public class TimelineStoreTestUtils {
entityType1, EnumSet.allOf(Field.class)), domainId1);
verifyEntityInfo(entityId2, entityType2, events2, relEntityMap,
EMPTY_PRIMARY_FILTERS, EMPTY_MAP, -123l, store.getEntity(entityId2,
EMPTY_PRIMARY_FILTERS, EMPTY_MAP, 0l, store.getEntity(entityId2,
entityType2, EnumSet.allOf(Field.class)), domainId1);
verifyEntityInfo(entityId4, entityType4, EMPTY_EVENTS, EMPTY_REL_ENTITIES,