YARN-4219. New levelDB cache storage for timeline v1.5. Contributed by

Li Lu

(cherry picked from commit 9fab22b366)
(cherry picked from commit 2f00d8d3a7)
This commit is contained in:
Xuan 2016-01-28 14:24:22 -08:00
parent 290b7db656
commit 353518f54c
12 changed files with 1128 additions and 468 deletions

View File

@ -214,6 +214,8 @@ Release 2.8.0 - UNRELEASED
YARN-4265. Provide new timeline plugin storage to support fine-grained entity YARN-4265. Provide new timeline plugin storage to support fine-grained entity
caching. (Li Lu and Jason Lowe via junping_du) caching. (Li Lu and Jason Lowe via junping_du)
YARN-4219. New levelDB cache storage for timeline v1.5. (Li Lu via xgong)
IMPROVEMENTS IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -1646,6 +1646,13 @@ private static void addDeprecatedKeys() {
DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC = DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
"2000, 500"; "2000, 500";
public static final String TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX
+ "leveldb-cache-read-cache-size";
public static final long
DEFAULT_TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE = 10 * 1024 * 1024;
public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS = public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS =
TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs"; TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs";
public static final long public static final long

View File

@ -2036,6 +2036,14 @@
<value>604800</value> <value>604800</value>
</property> </property>
<property>
<name>yarn.timeline-service.entity-group-fs-store.leveldb-cache-read-cache-size</name>
<description>
Read cache size for the leveldb cache storage in ATS v1.5 plugin storage.
</description>
<value>10485760</value>
</property>
<!-- Shared Cache Configuration --> <!-- Shared Cache Configuration -->
<property> <property>

View File

@ -0,0 +1,574 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
/**
* Map based implementation of {@link TimelineStore}. A hash map
* implementation should be connected to this implementation through a
* {@link TimelineStoreMapAdapter}.
*
* The methods are synchronized to avoid concurrent modifications.
*
*/
@Private
@Unstable
abstract class KeyValueBasedTimelineStore
extends AbstractService implements TimelineStore {
protected TimelineStoreMapAdapter<EntityIdentifier, TimelineEntity> entities;
protected TimelineStoreMapAdapter<EntityIdentifier, Long> entityInsertTimes;
protected TimelineStoreMapAdapter<String, TimelineDomain> domainById;
protected TimelineStoreMapAdapter<String, Set<TimelineDomain>> domainsByOwner;
private boolean serviceStopped = false;
private static final Log LOG
= LogFactory.getLog(KeyValueBasedTimelineStore.class);
public KeyValueBasedTimelineStore() {
super(KeyValueBasedTimelineStore.class.getName());
}
public KeyValueBasedTimelineStore(String name) {
super(name);
}
public synchronized boolean getServiceStopped() {
return serviceStopped;
}
@Override
protected synchronized void serviceStop() throws Exception {
serviceStopped = true;
super.serviceStop();
}
@Override
public synchronized TimelineEntities getEntities(String entityType, Long limit,
Long windowStart, Long windowEnd, String fromId, Long fromTs,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
if (getServiceStopped()) {
LOG.info("Service stopped, return null for the storage");
return null;
}
if (limit == null) {
limit = DEFAULT_LIMIT;
}
if (windowStart == null) {
windowStart = Long.MIN_VALUE;
}
if (windowEnd == null) {
windowEnd = Long.MAX_VALUE;
}
if (fields == null) {
fields = EnumSet.allOf(Field.class);
}
Iterator<TimelineEntity> entityIterator = null;
if (fromId != null) {
TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId,
entityType));
if (firstEntity == null) {
return new TimelineEntities();
} else {
entityIterator = entities.valueSetIterator(firstEntity);
}
}
if (entityIterator == null) {
entityIterator = entities.valueSetIterator();
}
List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
while (entityIterator.hasNext()) {
TimelineEntity entity = entityIterator.next();
if (entitiesSelected.size() >= limit) {
break;
}
if (!entity.getEntityType().equals(entityType)) {
continue;
}
if (entity.getStartTime() <= windowStart) {
continue;
}
if (entity.getStartTime() > windowEnd) {
continue;
}
if (fromTs != null && entityInsertTimes.get(new EntityIdentifier(
entity.getEntityId(), entity.getEntityType())) > fromTs) {
continue;
}
if (primaryFilter != null &&
!KeyValueBasedTimelineStoreUtils.matchPrimaryFilter(
entity.getPrimaryFilters(), primaryFilter)) {
continue;
}
if (secondaryFilters != null) { // AND logic
boolean flag = true;
for (NameValuePair secondaryFilter : secondaryFilters) {
if (secondaryFilter != null && !KeyValueBasedTimelineStoreUtils
.matchPrimaryFilter(entity.getPrimaryFilters(), secondaryFilter)
&& !KeyValueBasedTimelineStoreUtils.matchFilter(
entity.getOtherInfo(), secondaryFilter)) {
flag = false;
break;
}
}
if (!flag) {
continue;
}
}
if (entity.getDomainId() == null) {
entity.setDomainId(DEFAULT_DOMAIN_ID);
}
if (checkAcl == null || checkAcl.check(entity)) {
entitiesSelected.add(entity);
}
}
List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
for (TimelineEntity entitySelected : entitiesSelected) {
entitiesToReturn.add(KeyValueBasedTimelineStoreUtils.maskFields(
entitySelected, fields));
}
Collections.sort(entitiesToReturn);
TimelineEntities entitiesWrapper = new TimelineEntities();
entitiesWrapper.setEntities(entitiesToReturn);
return entitiesWrapper;
}
@Override
public synchronized TimelineEntity getEntity(String entityId, String entityType,
EnumSet<Field> fieldsToRetrieve) {
if (getServiceStopped()) {
LOG.info("Service stopped, return null for the storage");
return null;
}
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.allOf(Field.class);
}
TimelineEntity
entity = entities.get(new EntityIdentifier(entityId, entityType));
if (entity == null) {
return null;
} else {
return KeyValueBasedTimelineStoreUtils.maskFields(
entity, fieldsToRetrieve);
}
}
@Override
public synchronized TimelineEvents getEntityTimelines(String entityType,
SortedSet<String> entityIds, Long limit, Long windowStart,
Long windowEnd,
Set<String> eventTypes) {
if (getServiceStopped()) {
LOG.info("Service stopped, return null for the storage");
return null;
}
TimelineEvents allEvents = new TimelineEvents();
if (entityIds == null) {
return allEvents;
}
if (limit == null) {
limit = DEFAULT_LIMIT;
}
if (windowStart == null) {
windowStart = Long.MIN_VALUE;
}
if (windowEnd == null) {
windowEnd = Long.MAX_VALUE;
}
for (String entityId : entityIds) {
EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
TimelineEntity entity = entities.get(entityID);
if (entity == null) {
continue;
}
EventsOfOneEntity events = new EventsOfOneEntity();
events.setEntityId(entityId);
events.setEntityType(entityType);
for (TimelineEvent event : entity.getEvents()) {
if (events.getEvents().size() >= limit) {
break;
}
if (event.getTimestamp() <= windowStart) {
continue;
}
if (event.getTimestamp() > windowEnd) {
continue;
}
if (eventTypes != null && !eventTypes.contains(event.getEventType())) {
continue;
}
events.addEvent(event);
}
allEvents.addEvent(events);
}
return allEvents;
}
@Override
public TimelineDomain getDomain(String domainId)
throws IOException {
if (getServiceStopped()) {
LOG.info("Service stopped, return null for the storage");
return null;
}
TimelineDomain domain = domainById.get(domainId);
if (domain == null) {
return null;
} else {
return KeyValueBasedTimelineStoreUtils.createTimelineDomain(
domain.getId(),
domain.getDescription(),
domain.getOwner(),
domain.getReaders(),
domain.getWriters(),
domain.getCreatedTime(),
domain.getModifiedTime());
}
}
@Override
public TimelineDomains getDomains(String owner)
throws IOException {
if (getServiceStopped()) {
LOG.info("Service stopped, return null for the storage");
return null;
}
List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
Set<TimelineDomain> domainsOfOneOwner = domainsByOwner.get(owner);
if (domainsOfOneOwner == null) {
return new TimelineDomains();
}
for (TimelineDomain domain : domainsByOwner.get(owner)) {
TimelineDomain domainToReturn = KeyValueBasedTimelineStoreUtils
.createTimelineDomain(
domain.getId(),
domain.getDescription(),
domain.getOwner(),
domain.getReaders(),
domain.getWriters(),
domain.getCreatedTime(),
domain.getModifiedTime());
domains.add(domainToReturn);
}
Collections.sort(domains, new Comparator<TimelineDomain>() {
@Override
public int compare(
TimelineDomain domain1, TimelineDomain domain2) {
int result = domain2.getCreatedTime().compareTo(
domain1.getCreatedTime());
if (result == 0) {
return domain2.getModifiedTime().compareTo(
domain1.getModifiedTime());
} else {
return result;
}
}
});
TimelineDomains domainsToReturn = new TimelineDomains();
domainsToReturn.addDomains(domains);
return domainsToReturn;
}
@Override
public synchronized TimelinePutResponse put(TimelineEntities data) {
TimelinePutResponse response = new TimelinePutResponse();
if (getServiceStopped()) {
LOG.info("Service stopped, return null for the storage");
TimelinePutError error = new TimelinePutError();
error.setErrorCode(TimelinePutError.IO_EXCEPTION);
response.addError(error);
return response;
}
for (TimelineEntity entity : data.getEntities()) {
EntityIdentifier entityId =
new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
// store entity info in memory
TimelineEntity existingEntity = entities.get(entityId);
boolean needsPut = false;
if (existingEntity == null) {
existingEntity = new TimelineEntity();
existingEntity.setEntityId(entity.getEntityId());
existingEntity.setEntityType(entity.getEntityType());
existingEntity.setStartTime(entity.getStartTime());
if (entity.getDomainId() == null ||
entity.getDomainId().length() == 0) {
TimelinePutError error = new TimelinePutError();
error.setEntityId(entityId.getId());
error.setEntityType(entityId.getType());
error.setErrorCode(TimelinePutError.NO_DOMAIN);
response.addError(error);
continue;
}
existingEntity.setDomainId(entity.getDomainId());
// insert a new entity to the storage, update insert time map
entityInsertTimes.put(entityId, System.currentTimeMillis());
needsPut = true;
}
if (entity.getEvents() != null) {
if (existingEntity.getEvents() == null) {
existingEntity.setEvents(entity.getEvents());
} else {
existingEntity.addEvents(entity.getEvents());
}
Collections.sort(existingEntity.getEvents());
needsPut = true;
}
// check startTime
if (existingEntity.getStartTime() == null) {
if (existingEntity.getEvents() == null
|| existingEntity.getEvents().isEmpty()) {
TimelinePutError error = new TimelinePutError();
error.setEntityId(entityId.getId());
error.setEntityType(entityId.getType());
error.setErrorCode(TimelinePutError.NO_START_TIME);
response.addError(error);
entities.remove(entityId);
entityInsertTimes.remove(entityId);
continue;
} else {
Long min = Long.MAX_VALUE;
for (TimelineEvent e : entity.getEvents()) {
if (min > e.getTimestamp()) {
min = e.getTimestamp();
}
}
existingEntity.setStartTime(min);
needsPut = true;
}
}
if (entity.getPrimaryFilters() != null) {
if (existingEntity.getPrimaryFilters() == null) {
existingEntity.setPrimaryFilters(new HashMap<String, Set<Object>>());
}
for (Entry<String, Set<Object>> pf :
entity.getPrimaryFilters().entrySet()) {
for (Object pfo : pf.getValue()) {
existingEntity.addPrimaryFilter(pf.getKey(),
KeyValueBasedTimelineStoreUtils.compactNumber(pfo));
needsPut = true;
}
}
}
if (entity.getOtherInfo() != null) {
if (existingEntity.getOtherInfo() == null) {
existingEntity.setOtherInfo(new HashMap<String, Object>());
}
for (Entry<String, Object> info : entity.getOtherInfo().entrySet()) {
existingEntity.addOtherInfo(info.getKey(),
KeyValueBasedTimelineStoreUtils.compactNumber(info.getValue()));
needsPut = true;
}
}
if (needsPut) {
entities.put(entityId, existingEntity);
}
// relate it to other entities
if (entity.getRelatedEntities() == null) {
continue;
}
for (Entry<String, Set<String>> partRelatedEntities : entity
.getRelatedEntities().entrySet()) {
if (partRelatedEntities == null) {
continue;
}
for (String idStr : partRelatedEntities.getValue()) {
EntityIdentifier relatedEntityId =
new EntityIdentifier(idStr, partRelatedEntities.getKey());
TimelineEntity relatedEntity = entities.get(relatedEntityId);
if (relatedEntity != null) {
if (relatedEntity.getDomainId().equals(
existingEntity.getDomainId())) {
relatedEntity.addRelatedEntity(
existingEntity.getEntityType(), existingEntity.getEntityId());
entities.put(relatedEntityId, relatedEntity);
} else {
// in this case the entity will be put, but the relation will be
// ignored
TimelinePutError error = new TimelinePutError();
error.setEntityType(existingEntity.getEntityType());
error.setEntityId(existingEntity.getEntityId());
error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
response.addError(error);
}
} else {
relatedEntity = new TimelineEntity();
relatedEntity.setEntityId(relatedEntityId.getId());
relatedEntity.setEntityType(relatedEntityId.getType());
relatedEntity.setStartTime(existingEntity.getStartTime());
relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
existingEntity.getEntityId());
relatedEntity.setDomainId(existingEntity.getDomainId());
entities.put(relatedEntityId, relatedEntity);
entityInsertTimes.put(relatedEntityId, System.currentTimeMillis());
}
}
}
}
return response;
}
public void put(TimelineDomain domain) throws IOException {
if (getServiceStopped()) {
LOG.info("Service stopped, return null for the storage");
return;
}
TimelineDomain domainToReplace =
domainById.get(domain.getId());
Long currentTimestamp = System.currentTimeMillis();
TimelineDomain domainToStore
= KeyValueBasedTimelineStoreUtils.createTimelineDomain(
domain.getId(), domain.getDescription(), domain.getOwner(),
domain.getReaders(), domain.getWriters(),
(domainToReplace == null ?
currentTimestamp : domainToReplace.getCreatedTime()),
currentTimestamp);
domainById.put(domainToStore.getId(), domainToStore);
Set<TimelineDomain> domainsByOneOwner =
domainsByOwner.get(domainToStore.getOwner());
if (domainsByOneOwner == null) {
domainsByOneOwner = new HashSet<TimelineDomain>();
domainsByOwner.put(domainToStore.getOwner(), domainsByOneOwner);
}
if (domainToReplace != null) {
domainsByOneOwner.remove(domainToReplace);
}
domainsByOneOwner.add(domainToStore);
}
private static class KeyValueBasedTimelineStoreUtils {
static TimelineDomain createTimelineDomain(
String id, String description, String owner,
String readers, String writers,
Long createdTime, Long modifiedTime) {
TimelineDomain domainToStore = new TimelineDomain();
domainToStore.setId(id);
domainToStore.setDescription(description);
domainToStore.setOwner(owner);
domainToStore.setReaders(readers);
domainToStore.setWriters(writers);
domainToStore.setCreatedTime(createdTime);
domainToStore.setModifiedTime(modifiedTime);
return domainToStore;
}
static TimelineEntity maskFields(
TimelineEntity entity, EnumSet<Field> fields) {
// Conceal the fields that are not going to be exposed
TimelineEntity entityToReturn = new TimelineEntity();
entityToReturn.setEntityId(entity.getEntityId());
entityToReturn.setEntityType(entity.getEntityType());
entityToReturn.setStartTime(entity.getStartTime());
entityToReturn.setDomainId(entity.getDomainId());
// Deep copy
if (fields.contains(Field.EVENTS)) {
entityToReturn.addEvents(entity.getEvents());
} else if (fields.contains(Field.LAST_EVENT_ONLY)) {
entityToReturn.addEvent(entity.getEvents().get(0));
} else {
entityToReturn.setEvents(null);
}
if (fields.contains(Field.RELATED_ENTITIES)) {
entityToReturn.addRelatedEntities(entity.getRelatedEntities());
} else {
entityToReturn.setRelatedEntities(null);
}
if (fields.contains(Field.PRIMARY_FILTERS)) {
entityToReturn.addPrimaryFilters(entity.getPrimaryFilters());
} else {
entityToReturn.setPrimaryFilters(null);
}
if (fields.contains(Field.OTHER_INFO)) {
entityToReturn.addOtherInfo(entity.getOtherInfo());
} else {
entityToReturn.setOtherInfo(null);
}
return entityToReturn;
}
static boolean matchFilter(Map<String, Object> tags,
NameValuePair filter) {
Object value = tags.get(filter.getName());
if (value == null) { // doesn't have the filter
return false;
} else if (!value.equals(filter.getValue())) { // doesn't match the filter
return false;
}
return true;
}
static boolean matchPrimaryFilter(Map<String, Set<Object>> tags,
NameValuePair filter) {
Set<Object> value = tags.get(filter.getName());
if (value == null) { // doesn't have the filter
return false;
} else {
return value.contains(filter.getValue());
}
}
static Object compactNumber(Object o) {
if (o instanceof Long) {
Long l = (Long) o;
if (l >= Integer.MIN_VALUE && l <= Integer.MAX_VALUE) {
return l.intValue();
}
}
return o;
}
}
}

View File

@ -18,38 +18,13 @@
package org.apache.hadoop.yarn.server.timeline; package org.apache.hadoop.yarn.server.timeline;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID; import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeSet;
/** /**
* In-memory implementation of {@link TimelineStore}. This * In-memory implementation of {@link TimelineStore}. This
@ -62,448 +37,60 @@
*/ */
@Private @Private
@Unstable @Unstable
public class MemoryTimelineStore public class MemoryTimelineStore extends KeyValueBasedTimelineStore {
extends AbstractService implements TimelineStore {
private Map<EntityIdentifier, TimelineEntity> entities = static class HashMapStoreAdapter<K, V>
new HashMap<EntityIdentifier, TimelineEntity>(); implements TimelineStoreMapAdapter<K, V> {
private Map<EntityIdentifier, Long> entityInsertTimes = Map<K, V> internalMap = new HashMap<>();
new HashMap<EntityIdentifier, Long>();
private Map<String, TimelineDomain> domainsById = @Override
new HashMap<String, TimelineDomain>(); public V get(K key) {
private Map<String, Set<TimelineDomain>> domainsByOwner = return internalMap.get(key);
new HashMap<String, Set<TimelineDomain>>(); }
@Override
public void put(K key, V value) {
internalMap.put(key, value);
}
@Override
public void remove(K key) {
internalMap.remove(key);
}
@Override
public Iterator<V>
valueSetIterator() {
return new TreeSet<>(internalMap.values()).iterator();
}
@Override
@SuppressWarnings("unchecked")
public Iterator<V> valueSetIterator(V minV) {
if (minV instanceof Comparable) {
TreeSet<V> tempTreeSet = new TreeSet<>();
for (V value : internalMap.values()) {
if (((Comparable) value).compareTo(minV) >= 0) {
tempTreeSet.add(value);
}
}
return tempTreeSet.iterator();
} else {
return valueSetIterator();
}
}
}
public MemoryTimelineStore() { public MemoryTimelineStore() {
super(MemoryTimelineStore.class.getName()); this(MemoryTimelineStore.class.getName());
} }
@Override public MemoryTimelineStore(String name) {
public synchronized TimelineEntities getEntities(String entityType, Long limit, super(name);
Long windowStart, Long windowEnd, String fromId, Long fromTs, entities = new HashMapStoreAdapter<>();
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters, entityInsertTimes = new HashMapStoreAdapter<>();
EnumSet<Field> fields, CheckAcl checkAcl) throws IOException { domainById = new HashMapStoreAdapter<>();
if (limit == null) { domainsByOwner = new HashMapStoreAdapter<>();
limit = DEFAULT_LIMIT;
}
if (windowStart == null) {
windowStart = Long.MIN_VALUE;
}
if (windowEnd == null) {
windowEnd = Long.MAX_VALUE;
}
if (fields == null) {
fields = EnumSet.allOf(Field.class);
}
Iterator<TimelineEntity> entityIterator = null;
if (fromId != null) {
TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId,
entityType));
if (firstEntity == null) {
return new TimelineEntities();
} else {
entityIterator = new TreeSet<TimelineEntity>(entities.values())
.tailSet(firstEntity, true).iterator();
}
}
if (entityIterator == null) {
entityIterator = new PriorityQueue<TimelineEntity>(entities.values())
.iterator();
}
List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
while (entityIterator.hasNext()) {
TimelineEntity entity = entityIterator.next();
if (entitiesSelected.size() >= limit) {
break;
}
if (!entity.getEntityType().equals(entityType)) {
continue;
}
if (entity.getStartTime() <= windowStart) {
continue;
}
if (entity.getStartTime() > windowEnd) {
continue;
}
if (fromTs != null && entityInsertTimes.get(new EntityIdentifier(
entity.getEntityId(), entity.getEntityType())) > fromTs) {
continue;
}
if (primaryFilter != null &&
!matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
continue;
}
if (secondaryFilters != null) { // AND logic
boolean flag = true;
for (NameValuePair secondaryFilter : secondaryFilters) {
if (secondaryFilter != null && !matchPrimaryFilter(
entity.getPrimaryFilters(), secondaryFilter) &&
!matchFilter(entity.getOtherInfo(), secondaryFilter)) {
flag = false;
break;
}
}
if (!flag) {
continue;
}
}
if (entity.getDomainId() == null) {
entity.setDomainId(DEFAULT_DOMAIN_ID);
}
if (checkAcl == null || checkAcl.check(entity)) {
entitiesSelected.add(entity);
}
}
List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
for (TimelineEntity entitySelected : entitiesSelected) {
entitiesToReturn.add(maskFields(entitySelected, fields));
}
Collections.sort(entitiesToReturn);
TimelineEntities entitiesWrapper = new TimelineEntities();
entitiesWrapper.setEntities(entitiesToReturn);
return entitiesWrapper;
}
@Override
public synchronized TimelineEntity getEntity(String entityId, String entityType,
EnumSet<Field> fieldsToRetrieve) {
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.allOf(Field.class);
}
TimelineEntity entity = entities.get(new EntityIdentifier(entityId, entityType));
if (entity == null) {
return null;
} else {
return maskFields(entity, fieldsToRetrieve);
}
}
@Override
public synchronized TimelineEvents getEntityTimelines(String entityType,
SortedSet<String> entityIds, Long limit, Long windowStart,
Long windowEnd,
Set<String> eventTypes) {
TimelineEvents allEvents = new TimelineEvents();
if (entityIds == null) {
return allEvents;
}
if (limit == null) {
limit = DEFAULT_LIMIT;
}
if (windowStart == null) {
windowStart = Long.MIN_VALUE;
}
if (windowEnd == null) {
windowEnd = Long.MAX_VALUE;
}
for (String entityId : entityIds) {
EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
TimelineEntity entity = entities.get(entityID);
if (entity == null) {
continue;
}
EventsOfOneEntity events = new EventsOfOneEntity();
events.setEntityId(entityId);
events.setEntityType(entityType);
for (TimelineEvent event : entity.getEvents()) {
if (events.getEvents().size() >= limit) {
break;
}
if (event.getTimestamp() <= windowStart) {
continue;
}
if (event.getTimestamp() > windowEnd) {
continue;
}
if (eventTypes != null && !eventTypes.contains(event.getEventType())) {
continue;
}
events.addEvent(event);
}
allEvents.addEvent(events);
}
return allEvents;
}
@Override
public TimelineDomain getDomain(String domainId)
throws IOException {
TimelineDomain domain = domainsById.get(domainId);
if (domain == null) {
return null;
} else {
return createTimelineDomain(
domain.getId(),
domain.getDescription(),
domain.getOwner(),
domain.getReaders(),
domain.getWriters(),
domain.getCreatedTime(),
domain.getModifiedTime());
}
}
@Override
public TimelineDomains getDomains(String owner)
throws IOException {
List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
Set<TimelineDomain> domainsOfOneOwner = domainsByOwner.get(owner);
if (domainsOfOneOwner == null) {
return new TimelineDomains();
}
for (TimelineDomain domain : domainsByOwner.get(owner)) {
TimelineDomain domainToReturn = createTimelineDomain(
domain.getId(),
domain.getDescription(),
domain.getOwner(),
domain.getReaders(),
domain.getWriters(),
domain.getCreatedTime(),
domain.getModifiedTime());
domains.add(domainToReturn);
}
Collections.sort(domains, new Comparator<TimelineDomain>() {
@Override
public int compare(
TimelineDomain domain1, TimelineDomain domain2) {
int result = domain2.getCreatedTime().compareTo(
domain1.getCreatedTime());
if (result == 0) {
return domain2.getModifiedTime().compareTo(
domain1.getModifiedTime());
} else {
return result;
}
}
});
TimelineDomains domainsToReturn = new TimelineDomains();
domainsToReturn.addDomains(domains);
return domainsToReturn;
}
@Override
public synchronized TimelinePutResponse put(TimelineEntities data) {
TimelinePutResponse response = new TimelinePutResponse();
for (TimelineEntity entity : data.getEntities()) {
EntityIdentifier entityId =
new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
// store entity info in memory
TimelineEntity existingEntity = entities.get(entityId);
if (existingEntity == null) {
existingEntity = new TimelineEntity();
existingEntity.setEntityId(entity.getEntityId());
existingEntity.setEntityType(entity.getEntityType());
existingEntity.setStartTime(entity.getStartTime());
if (entity.getDomainId() == null ||
entity.getDomainId().length() == 0) {
TimelinePutError error = new TimelinePutError();
error.setEntityId(entityId.getId());
error.setEntityType(entityId.getType());
error.setErrorCode(TimelinePutError.NO_DOMAIN);
response.addError(error);
continue;
}
existingEntity.setDomainId(entity.getDomainId());
entities.put(entityId, existingEntity);
entityInsertTimes.put(entityId, System.currentTimeMillis());
}
if (entity.getEvents() != null) {
if (existingEntity.getEvents() == null) {
existingEntity.setEvents(entity.getEvents());
} else {
existingEntity.addEvents(entity.getEvents());
}
Collections.sort(existingEntity.getEvents());
}
// check startTime
if (existingEntity.getStartTime() == null) {
if (existingEntity.getEvents() == null
|| existingEntity.getEvents().isEmpty()) {
TimelinePutError error = new TimelinePutError();
error.setEntityId(entityId.getId());
error.setEntityType(entityId.getType());
error.setErrorCode(TimelinePutError.NO_START_TIME);
response.addError(error);
entities.remove(entityId);
entityInsertTimes.remove(entityId);
continue;
} else {
Long min = Long.MAX_VALUE;
for (TimelineEvent e : entity.getEvents()) {
if (min > e.getTimestamp()) {
min = e.getTimestamp();
}
}
existingEntity.setStartTime(min);
}
}
if (entity.getPrimaryFilters() != null) {
if (existingEntity.getPrimaryFilters() == null) {
existingEntity.setPrimaryFilters(new HashMap<String, Set<Object>>());
}
for (Entry<String, Set<Object>> pf :
entity.getPrimaryFilters().entrySet()) {
for (Object pfo : pf.getValue()) {
existingEntity.addPrimaryFilter(pf.getKey(), maybeConvert(pfo));
}
}
}
if (entity.getOtherInfo() != null) {
if (existingEntity.getOtherInfo() == null) {
existingEntity.setOtherInfo(new HashMap<String, Object>());
}
for (Entry<String, Object> info : entity.getOtherInfo().entrySet()) {
existingEntity.addOtherInfo(info.getKey(),
maybeConvert(info.getValue()));
}
}
// relate it to other entities
if (entity.getRelatedEntities() == null) {
continue;
}
for (Map.Entry<String, Set<String>> partRelatedEntities : entity
.getRelatedEntities().entrySet()) {
if (partRelatedEntities == null) {
continue;
}
for (String idStr : partRelatedEntities.getValue()) {
EntityIdentifier relatedEntityId =
new EntityIdentifier(idStr, partRelatedEntities.getKey());
TimelineEntity relatedEntity = entities.get(relatedEntityId);
if (relatedEntity != null) {
if (relatedEntity.getDomainId().equals(
existingEntity.getDomainId())) {
relatedEntity.addRelatedEntity(
existingEntity.getEntityType(), existingEntity.getEntityId());
} else {
// in this case the entity will be put, but the relation will be
// ignored
TimelinePutError error = new TimelinePutError();
error.setEntityType(existingEntity.getEntityType());
error.setEntityId(existingEntity.getEntityId());
error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
response.addError(error);
}
} else {
relatedEntity = new TimelineEntity();
relatedEntity.setEntityId(relatedEntityId.getId());
relatedEntity.setEntityType(relatedEntityId.getType());
relatedEntity.setStartTime(existingEntity.getStartTime());
relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
existingEntity.getEntityId());
relatedEntity.setDomainId(existingEntity.getDomainId());
entities.put(relatedEntityId, relatedEntity);
entityInsertTimes.put(relatedEntityId, System.currentTimeMillis());
}
}
}
}
return response;
}
public void put(TimelineDomain domain) throws IOException {
TimelineDomain domainToReplace =
domainsById.get(domain.getId());
Long currentTimestamp = System.currentTimeMillis();
TimelineDomain domainToStore = createTimelineDomain(
domain.getId(), domain.getDescription(), domain.getOwner(),
domain.getReaders(), domain.getWriters(),
(domainToReplace == null ?
currentTimestamp : domainToReplace.getCreatedTime()),
currentTimestamp);
domainsById.put(domainToStore.getId(), domainToStore);
Set<TimelineDomain> domainsByOneOwner =
domainsByOwner.get(domainToStore.getOwner());
if (domainsByOneOwner == null) {
domainsByOneOwner = new HashSet<TimelineDomain>();
domainsByOwner.put(domainToStore.getOwner(), domainsByOneOwner);
}
if (domainToReplace != null) {
domainsByOneOwner.remove(domainToReplace);
}
domainsByOneOwner.add(domainToStore);
}
private static TimelineDomain createTimelineDomain(
String id, String description, String owner,
String readers, String writers,
Long createdTime, Long modifiedTime) {
TimelineDomain domainToStore = new TimelineDomain();
domainToStore.setId(id);
domainToStore.setDescription(description);
domainToStore.setOwner(owner);
domainToStore.setReaders(readers);
domainToStore.setWriters(writers);
domainToStore.setCreatedTime(createdTime);
domainToStore.setModifiedTime(modifiedTime);
return domainToStore;
}
private static TimelineEntity maskFields(
TimelineEntity entity, EnumSet<Field> fields) {
// Conceal the fields that are not going to be exposed
TimelineEntity entityToReturn = new TimelineEntity();
entityToReturn.setEntityId(entity.getEntityId());
entityToReturn.setEntityType(entity.getEntityType());
entityToReturn.setStartTime(entity.getStartTime());
entityToReturn.setDomainId(entity.getDomainId());
// Deep copy
if (fields.contains(Field.EVENTS)) {
entityToReturn.addEvents(entity.getEvents());
} else if (fields.contains(Field.LAST_EVENT_ONLY)) {
entityToReturn.addEvent(entity.getEvents().get(0));
} else {
entityToReturn.setEvents(null);
}
if (fields.contains(Field.RELATED_ENTITIES)) {
entityToReturn.addRelatedEntities(entity.getRelatedEntities());
} else {
entityToReturn.setRelatedEntities(null);
}
if (fields.contains(Field.PRIMARY_FILTERS)) {
entityToReturn.addPrimaryFilters(entity.getPrimaryFilters());
} else {
entityToReturn.setPrimaryFilters(null);
}
if (fields.contains(Field.OTHER_INFO)) {
entityToReturn.addOtherInfo(entity.getOtherInfo());
} else {
entityToReturn.setOtherInfo(null);
}
return entityToReturn;
}
private static boolean matchFilter(Map<String, Object> tags,
NameValuePair filter) {
Object value = tags.get(filter.getName());
if (value == null) { // doesn't have the filter
return false;
} else if (!value.equals(filter.getValue())) { // doesn't match the filter
return false;
}
return true;
}
private static boolean matchPrimaryFilter(Map<String, Set<Object>> tags,
NameValuePair filter) {
Set<Object> value = tags.get(filter.getName());
if (value == null) { // doesn't have the filter
return false;
} else {
return value.contains(filter.getValue());
}
}
private static Object maybeConvert(Object o) {
if (o instanceof Long) {
Long l = (Long)o;
if (l >= Integer.MIN_VALUE && l <= Integer.MAX_VALUE) {
return l.intValue();
}
}
return o;
} }
} }

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import java.util.Iterator;
/**
* An adapter for map timeline store implementations
* @param <K> the type of the key set
* @param <V> the type of the value set
*/
interface TimelineStoreMapAdapter<K, V> {
/**
* @param key
* @return map(key)
*/
V get(K key);
/**
* Add mapping key->value in the map
* @param key
* @param value
*/
void put(K key, V value);
/**
* Remove mapping with key keyToRemove
* @param keyToRemove
*/
void remove(K keyToRemove);
/**
* @return the iterator of the value set of the map
*/
Iterator<V> valueSetIterator();
/**
* Return the iterator of the value set of the map, starting from minV if type
* V is comparable.
* @param minV
* @return
*/
Iterator<V> valueSetIterator(V minV);
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.timeline.util; package org.apache.hadoop.yarn.server.timeline.util;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableComparator;
import java.io.IOException; import java.io.IOException;
@ -177,4 +178,10 @@ public static boolean prefixMatches(byte[] prefix, int prefixlen,
prefixlen) == 0; prefixlen) == 0;
} }
/**
* Default permission mask for the level db dir
*/
public static final FsPermission LEVELDB_DIR_UMASK = FsPermission
.createImmutable((short) 0700);
} }

View File

@ -132,5 +132,9 @@
<artifactId>mockito-all</artifactId> <artifactId>mockito-all</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -102,7 +102,8 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
} }
if (!appLogs.getDetailLogs().isEmpty()) { if (!appLogs.getDetailLogs().isEmpty()) {
if (store == null) { if (store == null) {
store = new MemoryTimelineStore(); store = new LevelDBCacheTimelineStore(groupId.toString(),
"LeveldbCache." + groupId);
store.init(config); store.init(config);
store.start(); store.start();
} }

View File

@ -0,0 +1,316 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
/**
* LevelDB implementation of {@link KeyValueBasedTimelineStore}. This
* implementation stores the entity hash map into a LevelDB instance.
* There are two partitions of the key space. One partition is to store a
* entity id to start time mapping:
*
* i!ENTITY_ID!ENTITY_TYPE -> ENTITY_START_TIME
*
* The other partition is to store the actual data:
*
* e!START_TIME!ENTITY_ID!ENTITY_TYPE -> ENTITY_BYTES
*
* This storage does not have any garbage collection mechanism, and is designed
* mainly for caching usages.
*/
@Private
@Unstable
public class LevelDBCacheTimelineStore extends KeyValueBasedTimelineStore {
private static final Log LOG
= LogFactory.getLog(LevelDBCacheTimelineStore.class);
private static final String CACHED_LDB_FILE_PREFIX = "-timeline-cache.ldb";
private String dbId;
private DB entityDb;
private Configuration configuration;
public LevelDBCacheTimelineStore(String id, String name) {
super(name);
dbId = id;
entityInsertTimes = new MemoryTimelineStore.HashMapStoreAdapter<>();
domainById = new MemoryTimelineStore.HashMapStoreAdapter<>();
domainsByOwner = new MemoryTimelineStore.HashMapStoreAdapter<>();
}
public LevelDBCacheTimelineStore(String id) {
this(id, LevelDBCacheTimelineStore.class.getName());
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
configuration = conf;
Options options = new Options();
options.createIfMissing(true);
options.cacheSize(conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE,
YarnConfiguration.
DEFAULT_TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE));
JniDBFactory factory = new JniDBFactory();
Path dbPath = new Path(
conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH),
dbId + CACHED_LDB_FILE_PREFIX);
FileSystem localFS = null;
try {
localFS = FileSystem.getLocal(conf);
if (!localFS.exists(dbPath)) {
if (!localFS.mkdirs(dbPath)) {
throw new IOException("Couldn't create directory for leveldb " +
"timeline store " + dbPath);
}
localFS.setPermission(dbPath, LeveldbUtils.LEVELDB_DIR_UMASK);
}
} finally {
IOUtils.cleanup(LOG, localFS);
}
LOG.info("Using leveldb path " + dbPath);
entityDb = factory.open(new File(dbPath.toString()), options);
entities = new LevelDBMapAdapter<>(entityDb);
super.serviceInit(conf);
}
@Override
protected synchronized void serviceStop() throws Exception {
IOUtils.cleanup(LOG, entityDb);
Path dbPath = new Path(
configuration.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH),
dbId + CACHED_LDB_FILE_PREFIX);
FileSystem localFS = null;
try {
localFS = FileSystem.getLocal(configuration);
if (!localFS.delete(dbPath, true)) {
throw new IOException("Couldn't delete data file for leveldb " +
"timeline store " + dbPath);
}
} finally {
IOUtils.cleanup(LOG, localFS);
}
super.serviceStop();
}
/**
* A specialized hash map storage that uses LevelDB for storing entity id to
* entity mappings.
*
* @param <K> an {@link EntityIdentifier} typed hash key
* @param <V> a {@link TimelineEntity} typed value
*/
static class LevelDBMapAdapter<K extends EntityIdentifier,
V extends TimelineEntity> implements TimelineStoreMapAdapter<K, V> {
private static final String TIME_INDEX_PREFIX = "i";
private static final String ENTITY_STORAGE_PREFIX = "e";
DB entityDb;
public LevelDBMapAdapter(DB currLevelDb) {
entityDb = currLevelDb;
}
@Override
public V get(K entityId) {
V result = null;
// Read the start time from the index
byte[] startTimeBytes = entityDb.get(getStartTimeKey(entityId));
if (startTimeBytes == null) {
return null;
}
// Build the key for the entity storage and read it
try {
result = getEntityForKey(getEntityKey(entityId, startTimeBytes));
} catch (IOException e) {
LOG.error("GenericObjectMapper cannot read key from key "
+ entityId.toString()
+ " into an object. Read aborted! ");
LOG.error(e.getMessage());
}
return result;
}
@Override
public void put(K entityId, V entity) {
Long startTime = entity.getStartTime();
if (startTime == null) {
startTime = System.currentTimeMillis();
}
// Build the key for the entity storage and read it
byte[] startTimeBytes = GenericObjectMapper.writeReverseOrderedLong(
startTime);
try {
byte[] valueBytes = GenericObjectMapper.write(entity);
entityDb.put(getEntityKey(entityId, startTimeBytes), valueBytes);
} catch (IOException e) {
LOG.error("GenericObjectMapper cannot write "
+ entity.getClass().getName()
+ " into a byte array. Write aborted! ");
LOG.error(e.getMessage());
}
// Build the key for the start time index
entityDb.put(getStartTimeKey(entityId), startTimeBytes);
}
@Override
public void remove(K entityId) {
// Read the start time from the index (key starts with an "i") then delete
// the record
LeveldbUtils.KeyBuilder startTimeKeyBuilder
= LeveldbUtils.KeyBuilder.newInstance();
startTimeKeyBuilder.add(TIME_INDEX_PREFIX).add(entityId.getId())
.add(entityId.getType());
byte[] startTimeBytes = entityDb.get(startTimeKeyBuilder.getBytes());
if (startTimeBytes == null) {
return;
}
entityDb.delete(startTimeKeyBuilder.getBytes());
// Build the key for the entity storage and delete it
entityDb.delete(getEntityKey(entityId, startTimeBytes));
}
@Override
public Iterator<V> valueSetIterator() {
return getIterator(null, Long.MAX_VALUE);
}
@Override
public Iterator<V> valueSetIterator(V minV) {
return getIterator(
new EntityIdentifier(minV.getEntityId(), minV.getEntityType()),
minV.getStartTime());
}
private Iterator<V> getIterator(
EntityIdentifier startId, long startTimeMax) {
final DBIterator internalDbIterator = entityDb.iterator();
// we need to iterate from the first element with key greater than or
// equal to ENTITY_STORAGE_PREFIX!maxTS(!startId), but stop on the first
// key who does not have prefix ENTITY_STORATE_PREFIX
// decide end prefix
LeveldbUtils.KeyBuilder entityPrefixKeyBuilder
= LeveldbUtils.KeyBuilder.newInstance();
entityPrefixKeyBuilder.add(ENTITY_STORAGE_PREFIX);
final byte[] prefixBytes = entityPrefixKeyBuilder.getBytesForLookup();
// decide start prefix on top of end prefix and seek
final byte[] startTimeBytes
= GenericObjectMapper.writeReverseOrderedLong(startTimeMax);
entityPrefixKeyBuilder.add(startTimeBytes, true);
if (startId != null) {
entityPrefixKeyBuilder.add(startId.getId());
}
final byte[] startPrefixBytes
= entityPrefixKeyBuilder.getBytesForLookup();
internalDbIterator.seek(startPrefixBytes);
return new Iterator<V>() {
@Override
public boolean hasNext() {
if (!internalDbIterator.hasNext()) {
return false;
}
Map.Entry<byte[], byte[]> nextEntry = internalDbIterator.peekNext();
if (LeveldbUtils.prefixMatches(
prefixBytes, prefixBytes.length, nextEntry.getKey())) {
return true;
}
return false;
}
@Override
public V next() {
if (hasNext()) {
Map.Entry<byte[], byte[]> nextRaw = internalDbIterator.next();
try {
V result = getEntityForKey(nextRaw.getKey());
return result;
} catch (IOException e) {
LOG.error("GenericObjectMapper cannot read key from key "
+ nextRaw.getKey()
+ " into an object. Read aborted! ");
LOG.error(e.getMessage());
}
}
return null;
}
// We do not support remove operations within one iteration
@Override
public void remove() {
LOG.error("LevelDB map adapter does not support iterate-and-remove"
+ " use cases. ");
}
};
}
@SuppressWarnings("unchecked")
private V getEntityForKey(byte[] key) throws IOException {
byte[] resultRaw = entityDb.get(key);
if (resultRaw == null) {
return null;
}
ObjectMapper entityMapper = new ObjectMapper();
return (V) entityMapper.readValue(resultRaw, TimelineEntity.class);
}
private byte[] getStartTimeKey(K entityId) {
LeveldbUtils.KeyBuilder startTimeKeyBuilder
= LeveldbUtils.KeyBuilder.newInstance();
startTimeKeyBuilder.add(TIME_INDEX_PREFIX).add(entityId.getId())
.add(entityId.getType());
return startTimeKeyBuilder.getBytes();
}
private byte[] getEntityKey(K entityId, byte[] startTimeBytes) {
LeveldbUtils.KeyBuilder entityKeyBuilder
= LeveldbUtils.KeyBuilder.newInstance();
entityKeyBuilder.add(ENTITY_STORAGE_PREFIX).add(startTimeBytes, true)
.add(entityId.getId()).add(entityId.getType());
return entityKeyBuilder.getBytes();
}
}
}

View File

@ -200,7 +200,7 @@ static TimelineDataManager getTdmWithStore(Configuration config, TimelineStore s
} }
static TimelineDataManager getTdmWithMemStore(Configuration config) { static TimelineDataManager getTdmWithMemStore(Configuration config) {
TimelineStore store = new MemoryTimelineStore(); TimelineStore store = new MemoryTimelineStore("MemoryStore.test");
TimelineDataManager tdm = getTdmWithStore(config, store); TimelineDataManager tdm = getTdmWithStore(config, store);
return tdm; return tdm;
} }

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class TestLevelDBCacheTimelineStore extends TimelineStoreTestUtils {
@Before
public void setup() throws Exception {
store = new LevelDBCacheTimelineStore("app1");
store.init(new YarnConfiguration());
store.start();
loadTestEntityData();
loadVerificationEntityData();
loadTestDomainData();
}
@After
public void tearDown() throws Exception {
store.stop();
}
public TimelineStore getTimelineStore() {
return store;
}
@Test
public void testGetSingleEntity() throws IOException {
super.testGetSingleEntity();
}
@Test
public void testGetEntities() throws IOException {
super.testGetEntities();
}
@Test
public void testGetEntitiesWithFromId() throws IOException {
super.testGetEntitiesWithFromId();
}
@Test
public void testGetEntitiesWithFromTs() throws IOException {
super.testGetEntitiesWithFromTs();
}
@Test
public void testGetEntitiesWithPrimaryFilters() throws IOException {
super.testGetEntitiesWithPrimaryFilters();
}
@Test
public void testGetEntitiesWithSecondaryFilters() throws IOException {
super.testGetEntitiesWithSecondaryFilters();
}
@Test
public void testGetEvents() throws IOException {
super.testGetEvents();
}
@Test
public void testGetDomain() throws IOException {
super.testGetDomain();
}
@Test
public void testGetDomains() throws IOException {
super.testGetDomains();
}
}