YARN-4219. New levelDB cache storage for timeline v1.5. Contributed by
Li Lu
(cherry picked from commit 9fab22b366
)
This commit is contained in:
parent
29195c7668
commit
2f00d8d3a7
|
@ -326,6 +326,8 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-4265. Provide new timeline plugin storage to support fine-grained entity
|
YARN-4265. Provide new timeline plugin storage to support fine-grained entity
|
||||||
caching. (Li Lu and Jason Lowe via junping_du)
|
caching. (Li Lu and Jason Lowe via junping_du)
|
||||||
|
|
||||||
|
YARN-4219. New levelDB cache storage for timeline v1.5. (Li Lu via xgong)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-644. Basic null check is not performed on passed in arguments before
|
YARN-644. Basic null check is not performed on passed in arguments before
|
||||||
|
|
|
@ -1702,6 +1702,13 @@ public class YarnConfiguration extends Configuration {
|
||||||
DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
|
DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
|
||||||
"2000, 500";
|
"2000, 500";
|
||||||
|
|
||||||
|
public static final String TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE =
|
||||||
|
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX
|
||||||
|
+ "leveldb-cache-read-cache-size";
|
||||||
|
|
||||||
|
public static final long
|
||||||
|
DEFAULT_TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE = 10 * 1024 * 1024;
|
||||||
|
|
||||||
public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS =
|
public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS =
|
||||||
TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs";
|
TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs";
|
||||||
public static final long
|
public static final long
|
||||||
|
|
|
@ -2036,6 +2036,14 @@
|
||||||
<value>604800</value>
|
<value>604800</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.timeline-service.entity-group-fs-store.leveldb-cache-read-cache-size</name>
|
||||||
|
<description>
|
||||||
|
Read cache size for the leveldb cache storage in ATS v1.5 plugin storage.
|
||||||
|
</description>
|
||||||
|
<value>10485760</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<!-- Shared Cache Configuration -->
|
<!-- Shared Cache Configuration -->
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
|
|
@ -0,0 +1,574 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.timeline;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.SortedSet;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map based implementation of {@link TimelineStore}. A hash map
|
||||||
|
* implementation should be connected to this implementation through a
|
||||||
|
* {@link TimelineStoreMapAdapter}.
|
||||||
|
*
|
||||||
|
* The methods are synchronized to avoid concurrent modifications.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
abstract class KeyValueBasedTimelineStore
|
||||||
|
extends AbstractService implements TimelineStore {
|
||||||
|
|
||||||
|
protected TimelineStoreMapAdapter<EntityIdentifier, TimelineEntity> entities;
|
||||||
|
protected TimelineStoreMapAdapter<EntityIdentifier, Long> entityInsertTimes;
|
||||||
|
protected TimelineStoreMapAdapter<String, TimelineDomain> domainById;
|
||||||
|
protected TimelineStoreMapAdapter<String, Set<TimelineDomain>> domainsByOwner;
|
||||||
|
|
||||||
|
private boolean serviceStopped = false;
|
||||||
|
|
||||||
|
private static final Log LOG
|
||||||
|
= LogFactory.getLog(KeyValueBasedTimelineStore.class);
|
||||||
|
|
||||||
|
public KeyValueBasedTimelineStore() {
|
||||||
|
super(KeyValueBasedTimelineStore.class.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
public KeyValueBasedTimelineStore(String name) {
|
||||||
|
super(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized boolean getServiceStopped() {
|
||||||
|
return serviceStopped;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected synchronized void serviceStop() throws Exception {
|
||||||
|
serviceStopped = true;
|
||||||
|
super.serviceStop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized TimelineEntities getEntities(String entityType, Long limit,
|
||||||
|
Long windowStart, Long windowEnd, String fromId, Long fromTs,
|
||||||
|
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
|
||||||
|
EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
|
||||||
|
if (getServiceStopped()) {
|
||||||
|
LOG.info("Service stopped, return null for the storage");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (limit == null) {
|
||||||
|
limit = DEFAULT_LIMIT;
|
||||||
|
}
|
||||||
|
if (windowStart == null) {
|
||||||
|
windowStart = Long.MIN_VALUE;
|
||||||
|
}
|
||||||
|
if (windowEnd == null) {
|
||||||
|
windowEnd = Long.MAX_VALUE;
|
||||||
|
}
|
||||||
|
if (fields == null) {
|
||||||
|
fields = EnumSet.allOf(Field.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
Iterator<TimelineEntity> entityIterator = null;
|
||||||
|
if (fromId != null) {
|
||||||
|
TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId,
|
||||||
|
entityType));
|
||||||
|
if (firstEntity == null) {
|
||||||
|
return new TimelineEntities();
|
||||||
|
} else {
|
||||||
|
entityIterator = entities.valueSetIterator(firstEntity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (entityIterator == null) {
|
||||||
|
entityIterator = entities.valueSetIterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
|
||||||
|
while (entityIterator.hasNext()) {
|
||||||
|
TimelineEntity entity = entityIterator.next();
|
||||||
|
if (entitiesSelected.size() >= limit) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (!entity.getEntityType().equals(entityType)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (entity.getStartTime() <= windowStart) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (entity.getStartTime() > windowEnd) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (fromTs != null && entityInsertTimes.get(new EntityIdentifier(
|
||||||
|
entity.getEntityId(), entity.getEntityType())) > fromTs) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (primaryFilter != null &&
|
||||||
|
!KeyValueBasedTimelineStoreUtils.matchPrimaryFilter(
|
||||||
|
entity.getPrimaryFilters(), primaryFilter)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (secondaryFilters != null) { // AND logic
|
||||||
|
boolean flag = true;
|
||||||
|
for (NameValuePair secondaryFilter : secondaryFilters) {
|
||||||
|
if (secondaryFilter != null && !KeyValueBasedTimelineStoreUtils
|
||||||
|
.matchPrimaryFilter(entity.getPrimaryFilters(), secondaryFilter)
|
||||||
|
&& !KeyValueBasedTimelineStoreUtils.matchFilter(
|
||||||
|
entity.getOtherInfo(), secondaryFilter)) {
|
||||||
|
flag = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!flag) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (entity.getDomainId() == null) {
|
||||||
|
entity.setDomainId(DEFAULT_DOMAIN_ID);
|
||||||
|
}
|
||||||
|
if (checkAcl == null || checkAcl.check(entity)) {
|
||||||
|
entitiesSelected.add(entity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
|
||||||
|
for (TimelineEntity entitySelected : entitiesSelected) {
|
||||||
|
entitiesToReturn.add(KeyValueBasedTimelineStoreUtils.maskFields(
|
||||||
|
entitySelected, fields));
|
||||||
|
}
|
||||||
|
Collections.sort(entitiesToReturn);
|
||||||
|
TimelineEntities entitiesWrapper = new TimelineEntities();
|
||||||
|
entitiesWrapper.setEntities(entitiesToReturn);
|
||||||
|
return entitiesWrapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized TimelineEntity getEntity(String entityId, String entityType,
|
||||||
|
EnumSet<Field> fieldsToRetrieve) {
|
||||||
|
if (getServiceStopped()) {
|
||||||
|
LOG.info("Service stopped, return null for the storage");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (fieldsToRetrieve == null) {
|
||||||
|
fieldsToRetrieve = EnumSet.allOf(Field.class);
|
||||||
|
}
|
||||||
|
TimelineEntity
|
||||||
|
entity = entities.get(new EntityIdentifier(entityId, entityType));
|
||||||
|
if (entity == null) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return KeyValueBasedTimelineStoreUtils.maskFields(
|
||||||
|
entity, fieldsToRetrieve);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized TimelineEvents getEntityTimelines(String entityType,
|
||||||
|
SortedSet<String> entityIds, Long limit, Long windowStart,
|
||||||
|
Long windowEnd,
|
||||||
|
Set<String> eventTypes) {
|
||||||
|
if (getServiceStopped()) {
|
||||||
|
LOG.info("Service stopped, return null for the storage");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
TimelineEvents allEvents = new TimelineEvents();
|
||||||
|
if (entityIds == null) {
|
||||||
|
return allEvents;
|
||||||
|
}
|
||||||
|
if (limit == null) {
|
||||||
|
limit = DEFAULT_LIMIT;
|
||||||
|
}
|
||||||
|
if (windowStart == null) {
|
||||||
|
windowStart = Long.MIN_VALUE;
|
||||||
|
}
|
||||||
|
if (windowEnd == null) {
|
||||||
|
windowEnd = Long.MAX_VALUE;
|
||||||
|
}
|
||||||
|
for (String entityId : entityIds) {
|
||||||
|
EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
|
||||||
|
TimelineEntity entity = entities.get(entityID);
|
||||||
|
if (entity == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
EventsOfOneEntity events = new EventsOfOneEntity();
|
||||||
|
events.setEntityId(entityId);
|
||||||
|
events.setEntityType(entityType);
|
||||||
|
for (TimelineEvent event : entity.getEvents()) {
|
||||||
|
if (events.getEvents().size() >= limit) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (event.getTimestamp() <= windowStart) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (event.getTimestamp() > windowEnd) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (eventTypes != null && !eventTypes.contains(event.getEventType())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
events.addEvent(event);
|
||||||
|
}
|
||||||
|
allEvents.addEvent(events);
|
||||||
|
}
|
||||||
|
return allEvents;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimelineDomain getDomain(String domainId)
|
||||||
|
throws IOException {
|
||||||
|
if (getServiceStopped()) {
|
||||||
|
LOG.info("Service stopped, return null for the storage");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
TimelineDomain domain = domainById.get(domainId);
|
||||||
|
if (domain == null) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return KeyValueBasedTimelineStoreUtils.createTimelineDomain(
|
||||||
|
domain.getId(),
|
||||||
|
domain.getDescription(),
|
||||||
|
domain.getOwner(),
|
||||||
|
domain.getReaders(),
|
||||||
|
domain.getWriters(),
|
||||||
|
domain.getCreatedTime(),
|
||||||
|
domain.getModifiedTime());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimelineDomains getDomains(String owner)
|
||||||
|
throws IOException {
|
||||||
|
if (getServiceStopped()) {
|
||||||
|
LOG.info("Service stopped, return null for the storage");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
|
||||||
|
Set<TimelineDomain> domainsOfOneOwner = domainsByOwner.get(owner);
|
||||||
|
if (domainsOfOneOwner == null) {
|
||||||
|
return new TimelineDomains();
|
||||||
|
}
|
||||||
|
for (TimelineDomain domain : domainsByOwner.get(owner)) {
|
||||||
|
TimelineDomain domainToReturn = KeyValueBasedTimelineStoreUtils
|
||||||
|
.createTimelineDomain(
|
||||||
|
domain.getId(),
|
||||||
|
domain.getDescription(),
|
||||||
|
domain.getOwner(),
|
||||||
|
domain.getReaders(),
|
||||||
|
domain.getWriters(),
|
||||||
|
domain.getCreatedTime(),
|
||||||
|
domain.getModifiedTime());
|
||||||
|
domains.add(domainToReturn);
|
||||||
|
}
|
||||||
|
Collections.sort(domains, new Comparator<TimelineDomain>() {
|
||||||
|
@Override
|
||||||
|
public int compare(
|
||||||
|
TimelineDomain domain1, TimelineDomain domain2) {
|
||||||
|
int result = domain2.getCreatedTime().compareTo(
|
||||||
|
domain1.getCreatedTime());
|
||||||
|
if (result == 0) {
|
||||||
|
return domain2.getModifiedTime().compareTo(
|
||||||
|
domain1.getModifiedTime());
|
||||||
|
} else {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
TimelineDomains domainsToReturn = new TimelineDomains();
|
||||||
|
domainsToReturn.addDomains(domains);
|
||||||
|
return domainsToReturn;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized TimelinePutResponse put(TimelineEntities data) {
|
||||||
|
TimelinePutResponse response = new TimelinePutResponse();
|
||||||
|
if (getServiceStopped()) {
|
||||||
|
LOG.info("Service stopped, return null for the storage");
|
||||||
|
TimelinePutError error = new TimelinePutError();
|
||||||
|
error.setErrorCode(TimelinePutError.IO_EXCEPTION);
|
||||||
|
response.addError(error);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
for (TimelineEntity entity : data.getEntities()) {
|
||||||
|
EntityIdentifier entityId =
|
||||||
|
new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
|
||||||
|
// store entity info in memory
|
||||||
|
TimelineEntity existingEntity = entities.get(entityId);
|
||||||
|
boolean needsPut = false;
|
||||||
|
if (existingEntity == null) {
|
||||||
|
existingEntity = new TimelineEntity();
|
||||||
|
existingEntity.setEntityId(entity.getEntityId());
|
||||||
|
existingEntity.setEntityType(entity.getEntityType());
|
||||||
|
existingEntity.setStartTime(entity.getStartTime());
|
||||||
|
if (entity.getDomainId() == null ||
|
||||||
|
entity.getDomainId().length() == 0) {
|
||||||
|
TimelinePutError error = new TimelinePutError();
|
||||||
|
error.setEntityId(entityId.getId());
|
||||||
|
error.setEntityType(entityId.getType());
|
||||||
|
error.setErrorCode(TimelinePutError.NO_DOMAIN);
|
||||||
|
response.addError(error);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
existingEntity.setDomainId(entity.getDomainId());
|
||||||
|
// insert a new entity to the storage, update insert time map
|
||||||
|
entityInsertTimes.put(entityId, System.currentTimeMillis());
|
||||||
|
needsPut = true;
|
||||||
|
}
|
||||||
|
if (entity.getEvents() != null) {
|
||||||
|
if (existingEntity.getEvents() == null) {
|
||||||
|
existingEntity.setEvents(entity.getEvents());
|
||||||
|
} else {
|
||||||
|
existingEntity.addEvents(entity.getEvents());
|
||||||
|
}
|
||||||
|
Collections.sort(existingEntity.getEvents());
|
||||||
|
needsPut = true;
|
||||||
|
}
|
||||||
|
// check startTime
|
||||||
|
if (existingEntity.getStartTime() == null) {
|
||||||
|
if (existingEntity.getEvents() == null
|
||||||
|
|| existingEntity.getEvents().isEmpty()) {
|
||||||
|
TimelinePutError error = new TimelinePutError();
|
||||||
|
error.setEntityId(entityId.getId());
|
||||||
|
error.setEntityType(entityId.getType());
|
||||||
|
error.setErrorCode(TimelinePutError.NO_START_TIME);
|
||||||
|
response.addError(error);
|
||||||
|
entities.remove(entityId);
|
||||||
|
entityInsertTimes.remove(entityId);
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
Long min = Long.MAX_VALUE;
|
||||||
|
for (TimelineEvent e : entity.getEvents()) {
|
||||||
|
if (min > e.getTimestamp()) {
|
||||||
|
min = e.getTimestamp();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
existingEntity.setStartTime(min);
|
||||||
|
needsPut = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (entity.getPrimaryFilters() != null) {
|
||||||
|
if (existingEntity.getPrimaryFilters() == null) {
|
||||||
|
existingEntity.setPrimaryFilters(new HashMap<String, Set<Object>>());
|
||||||
|
}
|
||||||
|
for (Entry<String, Set<Object>> pf :
|
||||||
|
entity.getPrimaryFilters().entrySet()) {
|
||||||
|
for (Object pfo : pf.getValue()) {
|
||||||
|
existingEntity.addPrimaryFilter(pf.getKey(),
|
||||||
|
KeyValueBasedTimelineStoreUtils.compactNumber(pfo));
|
||||||
|
needsPut = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (entity.getOtherInfo() != null) {
|
||||||
|
if (existingEntity.getOtherInfo() == null) {
|
||||||
|
existingEntity.setOtherInfo(new HashMap<String, Object>());
|
||||||
|
}
|
||||||
|
for (Entry<String, Object> info : entity.getOtherInfo().entrySet()) {
|
||||||
|
existingEntity.addOtherInfo(info.getKey(),
|
||||||
|
KeyValueBasedTimelineStoreUtils.compactNumber(info.getValue()));
|
||||||
|
needsPut = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (needsPut) {
|
||||||
|
entities.put(entityId, existingEntity);
|
||||||
|
}
|
||||||
|
|
||||||
|
// relate it to other entities
|
||||||
|
if (entity.getRelatedEntities() == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for (Entry<String, Set<String>> partRelatedEntities : entity
|
||||||
|
.getRelatedEntities().entrySet()) {
|
||||||
|
if (partRelatedEntities == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for (String idStr : partRelatedEntities.getValue()) {
|
||||||
|
EntityIdentifier relatedEntityId =
|
||||||
|
new EntityIdentifier(idStr, partRelatedEntities.getKey());
|
||||||
|
TimelineEntity relatedEntity = entities.get(relatedEntityId);
|
||||||
|
if (relatedEntity != null) {
|
||||||
|
if (relatedEntity.getDomainId().equals(
|
||||||
|
existingEntity.getDomainId())) {
|
||||||
|
relatedEntity.addRelatedEntity(
|
||||||
|
existingEntity.getEntityType(), existingEntity.getEntityId());
|
||||||
|
entities.put(relatedEntityId, relatedEntity);
|
||||||
|
} else {
|
||||||
|
// in this case the entity will be put, but the relation will be
|
||||||
|
// ignored
|
||||||
|
TimelinePutError error = new TimelinePutError();
|
||||||
|
error.setEntityType(existingEntity.getEntityType());
|
||||||
|
error.setEntityId(existingEntity.getEntityId());
|
||||||
|
error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
|
||||||
|
response.addError(error);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
relatedEntity = new TimelineEntity();
|
||||||
|
relatedEntity.setEntityId(relatedEntityId.getId());
|
||||||
|
relatedEntity.setEntityType(relatedEntityId.getType());
|
||||||
|
relatedEntity.setStartTime(existingEntity.getStartTime());
|
||||||
|
relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
|
||||||
|
existingEntity.getEntityId());
|
||||||
|
relatedEntity.setDomainId(existingEntity.getDomainId());
|
||||||
|
entities.put(relatedEntityId, relatedEntity);
|
||||||
|
entityInsertTimes.put(relatedEntityId, System.currentTimeMillis());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void put(TimelineDomain domain) throws IOException {
|
||||||
|
if (getServiceStopped()) {
|
||||||
|
LOG.info("Service stopped, return null for the storage");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
TimelineDomain domainToReplace =
|
||||||
|
domainById.get(domain.getId());
|
||||||
|
Long currentTimestamp = System.currentTimeMillis();
|
||||||
|
TimelineDomain domainToStore
|
||||||
|
= KeyValueBasedTimelineStoreUtils.createTimelineDomain(
|
||||||
|
domain.getId(), domain.getDescription(), domain.getOwner(),
|
||||||
|
domain.getReaders(), domain.getWriters(),
|
||||||
|
(domainToReplace == null ?
|
||||||
|
currentTimestamp : domainToReplace.getCreatedTime()),
|
||||||
|
currentTimestamp);
|
||||||
|
domainById.put(domainToStore.getId(), domainToStore);
|
||||||
|
Set<TimelineDomain> domainsByOneOwner =
|
||||||
|
domainsByOwner.get(domainToStore.getOwner());
|
||||||
|
if (domainsByOneOwner == null) {
|
||||||
|
domainsByOneOwner = new HashSet<TimelineDomain>();
|
||||||
|
domainsByOwner.put(domainToStore.getOwner(), domainsByOneOwner);
|
||||||
|
}
|
||||||
|
if (domainToReplace != null) {
|
||||||
|
domainsByOneOwner.remove(domainToReplace);
|
||||||
|
}
|
||||||
|
domainsByOneOwner.add(domainToStore);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class KeyValueBasedTimelineStoreUtils {
|
||||||
|
|
||||||
|
static TimelineDomain createTimelineDomain(
|
||||||
|
String id, String description, String owner,
|
||||||
|
String readers, String writers,
|
||||||
|
Long createdTime, Long modifiedTime) {
|
||||||
|
TimelineDomain domainToStore = new TimelineDomain();
|
||||||
|
domainToStore.setId(id);
|
||||||
|
domainToStore.setDescription(description);
|
||||||
|
domainToStore.setOwner(owner);
|
||||||
|
domainToStore.setReaders(readers);
|
||||||
|
domainToStore.setWriters(writers);
|
||||||
|
domainToStore.setCreatedTime(createdTime);
|
||||||
|
domainToStore.setModifiedTime(modifiedTime);
|
||||||
|
return domainToStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
static TimelineEntity maskFields(
|
||||||
|
TimelineEntity entity, EnumSet<Field> fields) {
|
||||||
|
// Conceal the fields that are not going to be exposed
|
||||||
|
TimelineEntity entityToReturn = new TimelineEntity();
|
||||||
|
entityToReturn.setEntityId(entity.getEntityId());
|
||||||
|
entityToReturn.setEntityType(entity.getEntityType());
|
||||||
|
entityToReturn.setStartTime(entity.getStartTime());
|
||||||
|
entityToReturn.setDomainId(entity.getDomainId());
|
||||||
|
// Deep copy
|
||||||
|
if (fields.contains(Field.EVENTS)) {
|
||||||
|
entityToReturn.addEvents(entity.getEvents());
|
||||||
|
} else if (fields.contains(Field.LAST_EVENT_ONLY)) {
|
||||||
|
entityToReturn.addEvent(entity.getEvents().get(0));
|
||||||
|
} else {
|
||||||
|
entityToReturn.setEvents(null);
|
||||||
|
}
|
||||||
|
if (fields.contains(Field.RELATED_ENTITIES)) {
|
||||||
|
entityToReturn.addRelatedEntities(entity.getRelatedEntities());
|
||||||
|
} else {
|
||||||
|
entityToReturn.setRelatedEntities(null);
|
||||||
|
}
|
||||||
|
if (fields.contains(Field.PRIMARY_FILTERS)) {
|
||||||
|
entityToReturn.addPrimaryFilters(entity.getPrimaryFilters());
|
||||||
|
} else {
|
||||||
|
entityToReturn.setPrimaryFilters(null);
|
||||||
|
}
|
||||||
|
if (fields.contains(Field.OTHER_INFO)) {
|
||||||
|
entityToReturn.addOtherInfo(entity.getOtherInfo());
|
||||||
|
} else {
|
||||||
|
entityToReturn.setOtherInfo(null);
|
||||||
|
}
|
||||||
|
return entityToReturn;
|
||||||
|
}
|
||||||
|
|
||||||
|
static boolean matchFilter(Map<String, Object> tags,
|
||||||
|
NameValuePair filter) {
|
||||||
|
Object value = tags.get(filter.getName());
|
||||||
|
if (value == null) { // doesn't have the filter
|
||||||
|
return false;
|
||||||
|
} else if (!value.equals(filter.getValue())) { // doesn't match the filter
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static boolean matchPrimaryFilter(Map<String, Set<Object>> tags,
|
||||||
|
NameValuePair filter) {
|
||||||
|
Set<Object> value = tags.get(filter.getName());
|
||||||
|
if (value == null) { // doesn't have the filter
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return value.contains(filter.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static Object compactNumber(Object o) {
|
||||||
|
if (o instanceof Long) {
|
||||||
|
Long l = (Long) o;
|
||||||
|
if (l >= Integer.MIN_VALUE && l <= Integer.MAX_VALUE) {
|
||||||
|
return l.intValue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return o;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -18,38 +18,13 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.timeline;
|
package org.apache.hadoop.yarn.server.timeline;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.PriorityQueue;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.SortedSet;
|
|
||||||
import java.util.TreeSet;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In-memory implementation of {@link TimelineStore}. This
|
* In-memory implementation of {@link TimelineStore}. This
|
||||||
|
@ -62,448 +37,60 @@ import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT
|
||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public class MemoryTimelineStore
|
public class MemoryTimelineStore extends KeyValueBasedTimelineStore {
|
||||||
extends AbstractService implements TimelineStore {
|
|
||||||
|
|
||||||
private Map<EntityIdentifier, TimelineEntity> entities =
|
static class HashMapStoreAdapter<K, V>
|
||||||
new HashMap<EntityIdentifier, TimelineEntity>();
|
implements TimelineStoreMapAdapter<K, V> {
|
||||||
private Map<EntityIdentifier, Long> entityInsertTimes =
|
Map<K, V> internalMap = new HashMap<>();
|
||||||
new HashMap<EntityIdentifier, Long>();
|
|
||||||
private Map<String, TimelineDomain> domainsById =
|
@Override
|
||||||
new HashMap<String, TimelineDomain>();
|
public V get(K key) {
|
||||||
private Map<String, Set<TimelineDomain>> domainsByOwner =
|
return internalMap.get(key);
|
||||||
new HashMap<String, Set<TimelineDomain>>();
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(K key, V value) {
|
||||||
|
internalMap.put(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove(K key) {
|
||||||
|
internalMap.remove(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<V>
|
||||||
|
valueSetIterator() {
|
||||||
|
return new TreeSet<>(internalMap.values()).iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public Iterator<V> valueSetIterator(V minV) {
|
||||||
|
if (minV instanceof Comparable) {
|
||||||
|
TreeSet<V> tempTreeSet = new TreeSet<>();
|
||||||
|
for (V value : internalMap.values()) {
|
||||||
|
if (((Comparable) value).compareTo(minV) >= 0) {
|
||||||
|
tempTreeSet.add(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tempTreeSet.iterator();
|
||||||
|
} else {
|
||||||
|
return valueSetIterator();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public MemoryTimelineStore() {
|
public MemoryTimelineStore() {
|
||||||
super(MemoryTimelineStore.class.getName());
|
this(MemoryTimelineStore.class.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
public MemoryTimelineStore(String name) {
|
||||||
public synchronized TimelineEntities getEntities(String entityType, Long limit,
|
super(name);
|
||||||
Long windowStart, Long windowEnd, String fromId, Long fromTs,
|
entities = new HashMapStoreAdapter<>();
|
||||||
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
|
entityInsertTimes = new HashMapStoreAdapter<>();
|
||||||
EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
|
domainById = new HashMapStoreAdapter<>();
|
||||||
if (limit == null) {
|
domainsByOwner = new HashMapStoreAdapter<>();
|
||||||
limit = DEFAULT_LIMIT;
|
|
||||||
}
|
|
||||||
if (windowStart == null) {
|
|
||||||
windowStart = Long.MIN_VALUE;
|
|
||||||
}
|
|
||||||
if (windowEnd == null) {
|
|
||||||
windowEnd = Long.MAX_VALUE;
|
|
||||||
}
|
|
||||||
if (fields == null) {
|
|
||||||
fields = EnumSet.allOf(Field.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
Iterator<TimelineEntity> entityIterator = null;
|
|
||||||
if (fromId != null) {
|
|
||||||
TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId,
|
|
||||||
entityType));
|
|
||||||
if (firstEntity == null) {
|
|
||||||
return new TimelineEntities();
|
|
||||||
} else {
|
|
||||||
entityIterator = new TreeSet<TimelineEntity>(entities.values())
|
|
||||||
.tailSet(firstEntity, true).iterator();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (entityIterator == null) {
|
|
||||||
entityIterator = new PriorityQueue<TimelineEntity>(entities.values())
|
|
||||||
.iterator();
|
|
||||||
}
|
|
||||||
|
|
||||||
List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
|
|
||||||
while (entityIterator.hasNext()) {
|
|
||||||
TimelineEntity entity = entityIterator.next();
|
|
||||||
if (entitiesSelected.size() >= limit) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (!entity.getEntityType().equals(entityType)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (entity.getStartTime() <= windowStart) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (entity.getStartTime() > windowEnd) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (fromTs != null && entityInsertTimes.get(new EntityIdentifier(
|
|
||||||
entity.getEntityId(), entity.getEntityType())) > fromTs) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (primaryFilter != null &&
|
|
||||||
!matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (secondaryFilters != null) { // AND logic
|
|
||||||
boolean flag = true;
|
|
||||||
for (NameValuePair secondaryFilter : secondaryFilters) {
|
|
||||||
if (secondaryFilter != null && !matchPrimaryFilter(
|
|
||||||
entity.getPrimaryFilters(), secondaryFilter) &&
|
|
||||||
!matchFilter(entity.getOtherInfo(), secondaryFilter)) {
|
|
||||||
flag = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!flag) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (entity.getDomainId() == null) {
|
|
||||||
entity.setDomainId(DEFAULT_DOMAIN_ID);
|
|
||||||
}
|
|
||||||
if (checkAcl == null || checkAcl.check(entity)) {
|
|
||||||
entitiesSelected.add(entity);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
|
|
||||||
for (TimelineEntity entitySelected : entitiesSelected) {
|
|
||||||
entitiesToReturn.add(maskFields(entitySelected, fields));
|
|
||||||
}
|
|
||||||
Collections.sort(entitiesToReturn);
|
|
||||||
TimelineEntities entitiesWrapper = new TimelineEntities();
|
|
||||||
entitiesWrapper.setEntities(entitiesToReturn);
|
|
||||||
return entitiesWrapper;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized TimelineEntity getEntity(String entityId, String entityType,
|
|
||||||
EnumSet<Field> fieldsToRetrieve) {
|
|
||||||
if (fieldsToRetrieve == null) {
|
|
||||||
fieldsToRetrieve = EnumSet.allOf(Field.class);
|
|
||||||
}
|
|
||||||
TimelineEntity entity = entities.get(new EntityIdentifier(entityId, entityType));
|
|
||||||
if (entity == null) {
|
|
||||||
return null;
|
|
||||||
} else {
|
|
||||||
return maskFields(entity, fieldsToRetrieve);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized TimelineEvents getEntityTimelines(String entityType,
|
|
||||||
SortedSet<String> entityIds, Long limit, Long windowStart,
|
|
||||||
Long windowEnd,
|
|
||||||
Set<String> eventTypes) {
|
|
||||||
TimelineEvents allEvents = new TimelineEvents();
|
|
||||||
if (entityIds == null) {
|
|
||||||
return allEvents;
|
|
||||||
}
|
|
||||||
if (limit == null) {
|
|
||||||
limit = DEFAULT_LIMIT;
|
|
||||||
}
|
|
||||||
if (windowStart == null) {
|
|
||||||
windowStart = Long.MIN_VALUE;
|
|
||||||
}
|
|
||||||
if (windowEnd == null) {
|
|
||||||
windowEnd = Long.MAX_VALUE;
|
|
||||||
}
|
|
||||||
for (String entityId : entityIds) {
|
|
||||||
EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
|
|
||||||
TimelineEntity entity = entities.get(entityID);
|
|
||||||
if (entity == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
EventsOfOneEntity events = new EventsOfOneEntity();
|
|
||||||
events.setEntityId(entityId);
|
|
||||||
events.setEntityType(entityType);
|
|
||||||
for (TimelineEvent event : entity.getEvents()) {
|
|
||||||
if (events.getEvents().size() >= limit) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (event.getTimestamp() <= windowStart) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (event.getTimestamp() > windowEnd) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (eventTypes != null && !eventTypes.contains(event.getEventType())) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
events.addEvent(event);
|
|
||||||
}
|
|
||||||
allEvents.addEvent(events);
|
|
||||||
}
|
|
||||||
return allEvents;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public TimelineDomain getDomain(String domainId)
|
|
||||||
throws IOException {
|
|
||||||
TimelineDomain domain = domainsById.get(domainId);
|
|
||||||
if (domain == null) {
|
|
||||||
return null;
|
|
||||||
} else {
|
|
||||||
return createTimelineDomain(
|
|
||||||
domain.getId(),
|
|
||||||
domain.getDescription(),
|
|
||||||
domain.getOwner(),
|
|
||||||
domain.getReaders(),
|
|
||||||
domain.getWriters(),
|
|
||||||
domain.getCreatedTime(),
|
|
||||||
domain.getModifiedTime());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public TimelineDomains getDomains(String owner)
|
|
||||||
throws IOException {
|
|
||||||
List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
|
|
||||||
Set<TimelineDomain> domainsOfOneOwner = domainsByOwner.get(owner);
|
|
||||||
if (domainsOfOneOwner == null) {
|
|
||||||
return new TimelineDomains();
|
|
||||||
}
|
|
||||||
for (TimelineDomain domain : domainsByOwner.get(owner)) {
|
|
||||||
TimelineDomain domainToReturn = createTimelineDomain(
|
|
||||||
domain.getId(),
|
|
||||||
domain.getDescription(),
|
|
||||||
domain.getOwner(),
|
|
||||||
domain.getReaders(),
|
|
||||||
domain.getWriters(),
|
|
||||||
domain.getCreatedTime(),
|
|
||||||
domain.getModifiedTime());
|
|
||||||
domains.add(domainToReturn);
|
|
||||||
}
|
|
||||||
Collections.sort(domains, new Comparator<TimelineDomain>() {
|
|
||||||
@Override
|
|
||||||
public int compare(
|
|
||||||
TimelineDomain domain1, TimelineDomain domain2) {
|
|
||||||
int result = domain2.getCreatedTime().compareTo(
|
|
||||||
domain1.getCreatedTime());
|
|
||||||
if (result == 0) {
|
|
||||||
return domain2.getModifiedTime().compareTo(
|
|
||||||
domain1.getModifiedTime());
|
|
||||||
} else {
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
TimelineDomains domainsToReturn = new TimelineDomains();
|
|
||||||
domainsToReturn.addDomains(domains);
|
|
||||||
return domainsToReturn;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized TimelinePutResponse put(TimelineEntities data) {
|
|
||||||
TimelinePutResponse response = new TimelinePutResponse();
|
|
||||||
for (TimelineEntity entity : data.getEntities()) {
|
|
||||||
EntityIdentifier entityId =
|
|
||||||
new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
|
|
||||||
// store entity info in memory
|
|
||||||
TimelineEntity existingEntity = entities.get(entityId);
|
|
||||||
if (existingEntity == null) {
|
|
||||||
existingEntity = new TimelineEntity();
|
|
||||||
existingEntity.setEntityId(entity.getEntityId());
|
|
||||||
existingEntity.setEntityType(entity.getEntityType());
|
|
||||||
existingEntity.setStartTime(entity.getStartTime());
|
|
||||||
if (entity.getDomainId() == null ||
|
|
||||||
entity.getDomainId().length() == 0) {
|
|
||||||
TimelinePutError error = new TimelinePutError();
|
|
||||||
error.setEntityId(entityId.getId());
|
|
||||||
error.setEntityType(entityId.getType());
|
|
||||||
error.setErrorCode(TimelinePutError.NO_DOMAIN);
|
|
||||||
response.addError(error);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
existingEntity.setDomainId(entity.getDomainId());
|
|
||||||
entities.put(entityId, existingEntity);
|
|
||||||
entityInsertTimes.put(entityId, System.currentTimeMillis());
|
|
||||||
}
|
|
||||||
if (entity.getEvents() != null) {
|
|
||||||
if (existingEntity.getEvents() == null) {
|
|
||||||
existingEntity.setEvents(entity.getEvents());
|
|
||||||
} else {
|
|
||||||
existingEntity.addEvents(entity.getEvents());
|
|
||||||
}
|
|
||||||
Collections.sort(existingEntity.getEvents());
|
|
||||||
}
|
|
||||||
// check startTime
|
|
||||||
if (existingEntity.getStartTime() == null) {
|
|
||||||
if (existingEntity.getEvents() == null
|
|
||||||
|| existingEntity.getEvents().isEmpty()) {
|
|
||||||
TimelinePutError error = new TimelinePutError();
|
|
||||||
error.setEntityId(entityId.getId());
|
|
||||||
error.setEntityType(entityId.getType());
|
|
||||||
error.setErrorCode(TimelinePutError.NO_START_TIME);
|
|
||||||
response.addError(error);
|
|
||||||
entities.remove(entityId);
|
|
||||||
entityInsertTimes.remove(entityId);
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
Long min = Long.MAX_VALUE;
|
|
||||||
for (TimelineEvent e : entity.getEvents()) {
|
|
||||||
if (min > e.getTimestamp()) {
|
|
||||||
min = e.getTimestamp();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
existingEntity.setStartTime(min);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (entity.getPrimaryFilters() != null) {
|
|
||||||
if (existingEntity.getPrimaryFilters() == null) {
|
|
||||||
existingEntity.setPrimaryFilters(new HashMap<String, Set<Object>>());
|
|
||||||
}
|
|
||||||
for (Entry<String, Set<Object>> pf :
|
|
||||||
entity.getPrimaryFilters().entrySet()) {
|
|
||||||
for (Object pfo : pf.getValue()) {
|
|
||||||
existingEntity.addPrimaryFilter(pf.getKey(), maybeConvert(pfo));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (entity.getOtherInfo() != null) {
|
|
||||||
if (existingEntity.getOtherInfo() == null) {
|
|
||||||
existingEntity.setOtherInfo(new HashMap<String, Object>());
|
|
||||||
}
|
|
||||||
for (Entry<String, Object> info : entity.getOtherInfo().entrySet()) {
|
|
||||||
existingEntity.addOtherInfo(info.getKey(),
|
|
||||||
maybeConvert(info.getValue()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// relate it to other entities
|
|
||||||
if (entity.getRelatedEntities() == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
for (Map.Entry<String, Set<String>> partRelatedEntities : entity
|
|
||||||
.getRelatedEntities().entrySet()) {
|
|
||||||
if (partRelatedEntities == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
for (String idStr : partRelatedEntities.getValue()) {
|
|
||||||
EntityIdentifier relatedEntityId =
|
|
||||||
new EntityIdentifier(idStr, partRelatedEntities.getKey());
|
|
||||||
TimelineEntity relatedEntity = entities.get(relatedEntityId);
|
|
||||||
if (relatedEntity != null) {
|
|
||||||
if (relatedEntity.getDomainId().equals(
|
|
||||||
existingEntity.getDomainId())) {
|
|
||||||
relatedEntity.addRelatedEntity(
|
|
||||||
existingEntity.getEntityType(), existingEntity.getEntityId());
|
|
||||||
} else {
|
|
||||||
// in this case the entity will be put, but the relation will be
|
|
||||||
// ignored
|
|
||||||
TimelinePutError error = new TimelinePutError();
|
|
||||||
error.setEntityType(existingEntity.getEntityType());
|
|
||||||
error.setEntityId(existingEntity.getEntityId());
|
|
||||||
error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
|
|
||||||
response.addError(error);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
relatedEntity = new TimelineEntity();
|
|
||||||
relatedEntity.setEntityId(relatedEntityId.getId());
|
|
||||||
relatedEntity.setEntityType(relatedEntityId.getType());
|
|
||||||
relatedEntity.setStartTime(existingEntity.getStartTime());
|
|
||||||
relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
|
|
||||||
existingEntity.getEntityId());
|
|
||||||
relatedEntity.setDomainId(existingEntity.getDomainId());
|
|
||||||
entities.put(relatedEntityId, relatedEntity);
|
|
||||||
entityInsertTimes.put(relatedEntityId, System.currentTimeMillis());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return response;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void put(TimelineDomain domain) throws IOException {
|
|
||||||
TimelineDomain domainToReplace =
|
|
||||||
domainsById.get(domain.getId());
|
|
||||||
Long currentTimestamp = System.currentTimeMillis();
|
|
||||||
TimelineDomain domainToStore = createTimelineDomain(
|
|
||||||
domain.getId(), domain.getDescription(), domain.getOwner(),
|
|
||||||
domain.getReaders(), domain.getWriters(),
|
|
||||||
(domainToReplace == null ?
|
|
||||||
currentTimestamp : domainToReplace.getCreatedTime()),
|
|
||||||
currentTimestamp);
|
|
||||||
domainsById.put(domainToStore.getId(), domainToStore);
|
|
||||||
Set<TimelineDomain> domainsByOneOwner =
|
|
||||||
domainsByOwner.get(domainToStore.getOwner());
|
|
||||||
if (domainsByOneOwner == null) {
|
|
||||||
domainsByOneOwner = new HashSet<TimelineDomain>();
|
|
||||||
domainsByOwner.put(domainToStore.getOwner(), domainsByOneOwner);
|
|
||||||
}
|
|
||||||
if (domainToReplace != null) {
|
|
||||||
domainsByOneOwner.remove(domainToReplace);
|
|
||||||
}
|
|
||||||
domainsByOneOwner.add(domainToStore);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static TimelineDomain createTimelineDomain(
|
|
||||||
String id, String description, String owner,
|
|
||||||
String readers, String writers,
|
|
||||||
Long createdTime, Long modifiedTime) {
|
|
||||||
TimelineDomain domainToStore = new TimelineDomain();
|
|
||||||
domainToStore.setId(id);
|
|
||||||
domainToStore.setDescription(description);
|
|
||||||
domainToStore.setOwner(owner);
|
|
||||||
domainToStore.setReaders(readers);
|
|
||||||
domainToStore.setWriters(writers);
|
|
||||||
domainToStore.setCreatedTime(createdTime);
|
|
||||||
domainToStore.setModifiedTime(modifiedTime);
|
|
||||||
return domainToStore;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static TimelineEntity maskFields(
|
|
||||||
TimelineEntity entity, EnumSet<Field> fields) {
|
|
||||||
// Conceal the fields that are not going to be exposed
|
|
||||||
TimelineEntity entityToReturn = new TimelineEntity();
|
|
||||||
entityToReturn.setEntityId(entity.getEntityId());
|
|
||||||
entityToReturn.setEntityType(entity.getEntityType());
|
|
||||||
entityToReturn.setStartTime(entity.getStartTime());
|
|
||||||
entityToReturn.setDomainId(entity.getDomainId());
|
|
||||||
// Deep copy
|
|
||||||
if (fields.contains(Field.EVENTS)) {
|
|
||||||
entityToReturn.addEvents(entity.getEvents());
|
|
||||||
} else if (fields.contains(Field.LAST_EVENT_ONLY)) {
|
|
||||||
entityToReturn.addEvent(entity.getEvents().get(0));
|
|
||||||
} else {
|
|
||||||
entityToReturn.setEvents(null);
|
|
||||||
}
|
|
||||||
if (fields.contains(Field.RELATED_ENTITIES)) {
|
|
||||||
entityToReturn.addRelatedEntities(entity.getRelatedEntities());
|
|
||||||
} else {
|
|
||||||
entityToReturn.setRelatedEntities(null);
|
|
||||||
}
|
|
||||||
if (fields.contains(Field.PRIMARY_FILTERS)) {
|
|
||||||
entityToReturn.addPrimaryFilters(entity.getPrimaryFilters());
|
|
||||||
} else {
|
|
||||||
entityToReturn.setPrimaryFilters(null);
|
|
||||||
}
|
|
||||||
if (fields.contains(Field.OTHER_INFO)) {
|
|
||||||
entityToReturn.addOtherInfo(entity.getOtherInfo());
|
|
||||||
} else {
|
|
||||||
entityToReturn.setOtherInfo(null);
|
|
||||||
}
|
|
||||||
return entityToReturn;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean matchFilter(Map<String, Object> tags,
|
|
||||||
NameValuePair filter) {
|
|
||||||
Object value = tags.get(filter.getName());
|
|
||||||
if (value == null) { // doesn't have the filter
|
|
||||||
return false;
|
|
||||||
} else if (!value.equals(filter.getValue())) { // doesn't match the filter
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean matchPrimaryFilter(Map<String, Set<Object>> tags,
|
|
||||||
NameValuePair filter) {
|
|
||||||
Set<Object> value = tags.get(filter.getName());
|
|
||||||
if (value == null) { // doesn't have the filter
|
|
||||||
return false;
|
|
||||||
} else {
|
|
||||||
return value.contains(filter.getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Object maybeConvert(Object o) {
|
|
||||||
if (o instanceof Long) {
|
|
||||||
Long l = (Long)o;
|
|
||||||
if (l >= Integer.MIN_VALUE && l <= Integer.MAX_VALUE) {
|
|
||||||
return l.intValue();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return o;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,60 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.timeline;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An adapter for map timeline store implementations
|
||||||
|
* @param <K> the type of the key set
|
||||||
|
* @param <V> the type of the value set
|
||||||
|
*/
|
||||||
|
interface TimelineStoreMapAdapter<K, V> {
|
||||||
|
/**
|
||||||
|
* @param key
|
||||||
|
* @return map(key)
|
||||||
|
*/
|
||||||
|
V get(K key);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add mapping key->value in the map
|
||||||
|
* @param key
|
||||||
|
* @param value
|
||||||
|
*/
|
||||||
|
void put(K key, V value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove mapping with key keyToRemove
|
||||||
|
* @param keyToRemove
|
||||||
|
*/
|
||||||
|
void remove(K keyToRemove);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the iterator of the value set of the map
|
||||||
|
*/
|
||||||
|
Iterator<V> valueSetIterator();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the iterator of the value set of the map, starting from minV if type
|
||||||
|
* V is comparable.
|
||||||
|
* @param minV
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
Iterator<V> valueSetIterator(V minV);
|
||||||
|
}
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.timeline.util;
|
package org.apache.hadoop.yarn.server.timeline.util;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.WritableComparator;
|
import org.apache.hadoop.io.WritableComparator;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -177,4 +178,10 @@ public class LeveldbUtils {
|
||||||
prefixlen) == 0;
|
prefixlen) == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default permission mask for the level db dir
|
||||||
|
*/
|
||||||
|
public static final FsPermission LEVELDB_DIR_UMASK = FsPermission
|
||||||
|
.createImmutable((short) 0700);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -132,5 +132,9 @@
|
||||||
<artifactId>mockito-all</artifactId>
|
<artifactId>mockito-all</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-databind</artifactId>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -102,7 +102,8 @@ public class EntityCacheItem {
|
||||||
}
|
}
|
||||||
if (!appLogs.getDetailLogs().isEmpty()) {
|
if (!appLogs.getDetailLogs().isEmpty()) {
|
||||||
if (store == null) {
|
if (store == null) {
|
||||||
store = new MemoryTimelineStore();
|
store = new LevelDBCacheTimelineStore(groupId.toString(),
|
||||||
|
"LeveldbCache." + groupId);
|
||||||
store.init(config);
|
store.init(config);
|
||||||
store.start();
|
store.start();
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,316 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.timeline;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils;
|
||||||
|
import org.fusesource.leveldbjni.JniDBFactory;
|
||||||
|
import org.iq80.leveldb.DB;
|
||||||
|
import org.iq80.leveldb.DBIterator;
|
||||||
|
import org.iq80.leveldb.Options;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* LevelDB implementation of {@link KeyValueBasedTimelineStore}. This
|
||||||
|
* implementation stores the entity hash map into a LevelDB instance.
|
||||||
|
* There are two partitions of the key space. One partition is to store a
|
||||||
|
* entity id to start time mapping:
|
||||||
|
*
|
||||||
|
* i!ENTITY_ID!ENTITY_TYPE -> ENTITY_START_TIME
|
||||||
|
*
|
||||||
|
* The other partition is to store the actual data:
|
||||||
|
*
|
||||||
|
* e!START_TIME!ENTITY_ID!ENTITY_TYPE -> ENTITY_BYTES
|
||||||
|
*
|
||||||
|
* This storage does not have any garbage collection mechanism, and is designed
|
||||||
|
* mainly for caching usages.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public class LevelDBCacheTimelineStore extends KeyValueBasedTimelineStore {
|
||||||
|
private static final Log LOG
|
||||||
|
= LogFactory.getLog(LevelDBCacheTimelineStore.class);
|
||||||
|
private static final String CACHED_LDB_FILE_PREFIX = "-timeline-cache.ldb";
|
||||||
|
private String dbId;
|
||||||
|
private DB entityDb;
|
||||||
|
private Configuration configuration;
|
||||||
|
|
||||||
|
public LevelDBCacheTimelineStore(String id, String name) {
|
||||||
|
super(name);
|
||||||
|
dbId = id;
|
||||||
|
entityInsertTimes = new MemoryTimelineStore.HashMapStoreAdapter<>();
|
||||||
|
domainById = new MemoryTimelineStore.HashMapStoreAdapter<>();
|
||||||
|
domainsByOwner = new MemoryTimelineStore.HashMapStoreAdapter<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public LevelDBCacheTimelineStore(String id) {
|
||||||
|
this(id, LevelDBCacheTimelineStore.class.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
configuration = conf;
|
||||||
|
Options options = new Options();
|
||||||
|
options.createIfMissing(true);
|
||||||
|
options.cacheSize(conf.getLong(
|
||||||
|
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE,
|
||||||
|
YarnConfiguration.
|
||||||
|
DEFAULT_TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE));
|
||||||
|
JniDBFactory factory = new JniDBFactory();
|
||||||
|
Path dbPath = new Path(
|
||||||
|
conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH),
|
||||||
|
dbId + CACHED_LDB_FILE_PREFIX);
|
||||||
|
FileSystem localFS = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
localFS = FileSystem.getLocal(conf);
|
||||||
|
if (!localFS.exists(dbPath)) {
|
||||||
|
if (!localFS.mkdirs(dbPath)) {
|
||||||
|
throw new IOException("Couldn't create directory for leveldb " +
|
||||||
|
"timeline store " + dbPath);
|
||||||
|
}
|
||||||
|
localFS.setPermission(dbPath, LeveldbUtils.LEVELDB_DIR_UMASK);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, localFS);
|
||||||
|
}
|
||||||
|
LOG.info("Using leveldb path " + dbPath);
|
||||||
|
entityDb = factory.open(new File(dbPath.toString()), options);
|
||||||
|
entities = new LevelDBMapAdapter<>(entityDb);
|
||||||
|
|
||||||
|
super.serviceInit(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected synchronized void serviceStop() throws Exception {
|
||||||
|
IOUtils.cleanup(LOG, entityDb);
|
||||||
|
Path dbPath = new Path(
|
||||||
|
configuration.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH),
|
||||||
|
dbId + CACHED_LDB_FILE_PREFIX);
|
||||||
|
FileSystem localFS = null;
|
||||||
|
try {
|
||||||
|
localFS = FileSystem.getLocal(configuration);
|
||||||
|
if (!localFS.delete(dbPath, true)) {
|
||||||
|
throw new IOException("Couldn't delete data file for leveldb " +
|
||||||
|
"timeline store " + dbPath);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, localFS);
|
||||||
|
}
|
||||||
|
super.serviceStop();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A specialized hash map storage that uses LevelDB for storing entity id to
|
||||||
|
* entity mappings.
|
||||||
|
*
|
||||||
|
* @param <K> an {@link EntityIdentifier} typed hash key
|
||||||
|
* @param <V> a {@link TimelineEntity} typed value
|
||||||
|
*/
|
||||||
|
static class LevelDBMapAdapter<K extends EntityIdentifier,
|
||||||
|
V extends TimelineEntity> implements TimelineStoreMapAdapter<K, V> {
|
||||||
|
private static final String TIME_INDEX_PREFIX = "i";
|
||||||
|
private static final String ENTITY_STORAGE_PREFIX = "e";
|
||||||
|
DB entityDb;
|
||||||
|
|
||||||
|
public LevelDBMapAdapter(DB currLevelDb) {
|
||||||
|
entityDb = currLevelDb;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public V get(K entityId) {
|
||||||
|
V result = null;
|
||||||
|
// Read the start time from the index
|
||||||
|
byte[] startTimeBytes = entityDb.get(getStartTimeKey(entityId));
|
||||||
|
if (startTimeBytes == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build the key for the entity storage and read it
|
||||||
|
try {
|
||||||
|
result = getEntityForKey(getEntityKey(entityId, startTimeBytes));
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("GenericObjectMapper cannot read key from key "
|
||||||
|
+ entityId.toString()
|
||||||
|
+ " into an object. Read aborted! ");
|
||||||
|
LOG.error(e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(K entityId, V entity) {
|
||||||
|
Long startTime = entity.getStartTime();
|
||||||
|
if (startTime == null) {
|
||||||
|
startTime = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
// Build the key for the entity storage and read it
|
||||||
|
byte[] startTimeBytes = GenericObjectMapper.writeReverseOrderedLong(
|
||||||
|
startTime);
|
||||||
|
try {
|
||||||
|
byte[] valueBytes = GenericObjectMapper.write(entity);
|
||||||
|
entityDb.put(getEntityKey(entityId, startTimeBytes), valueBytes);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("GenericObjectMapper cannot write "
|
||||||
|
+ entity.getClass().getName()
|
||||||
|
+ " into a byte array. Write aborted! ");
|
||||||
|
LOG.error(e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build the key for the start time index
|
||||||
|
entityDb.put(getStartTimeKey(entityId), startTimeBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove(K entityId) {
|
||||||
|
// Read the start time from the index (key starts with an "i") then delete
|
||||||
|
// the record
|
||||||
|
LeveldbUtils.KeyBuilder startTimeKeyBuilder
|
||||||
|
= LeveldbUtils.KeyBuilder.newInstance();
|
||||||
|
startTimeKeyBuilder.add(TIME_INDEX_PREFIX).add(entityId.getId())
|
||||||
|
.add(entityId.getType());
|
||||||
|
byte[] startTimeBytes = entityDb.get(startTimeKeyBuilder.getBytes());
|
||||||
|
if (startTimeBytes == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
entityDb.delete(startTimeKeyBuilder.getBytes());
|
||||||
|
|
||||||
|
// Build the key for the entity storage and delete it
|
||||||
|
entityDb.delete(getEntityKey(entityId, startTimeBytes));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<V> valueSetIterator() {
|
||||||
|
return getIterator(null, Long.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<V> valueSetIterator(V minV) {
|
||||||
|
return getIterator(
|
||||||
|
new EntityIdentifier(minV.getEntityId(), minV.getEntityType()),
|
||||||
|
minV.getStartTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Iterator<V> getIterator(
|
||||||
|
EntityIdentifier startId, long startTimeMax) {
|
||||||
|
|
||||||
|
final DBIterator internalDbIterator = entityDb.iterator();
|
||||||
|
|
||||||
|
// we need to iterate from the first element with key greater than or
|
||||||
|
// equal to ENTITY_STORAGE_PREFIX!maxTS(!startId), but stop on the first
|
||||||
|
// key who does not have prefix ENTITY_STORATE_PREFIX
|
||||||
|
|
||||||
|
// decide end prefix
|
||||||
|
LeveldbUtils.KeyBuilder entityPrefixKeyBuilder
|
||||||
|
= LeveldbUtils.KeyBuilder.newInstance();
|
||||||
|
entityPrefixKeyBuilder.add(ENTITY_STORAGE_PREFIX);
|
||||||
|
final byte[] prefixBytes = entityPrefixKeyBuilder.getBytesForLookup();
|
||||||
|
// decide start prefix on top of end prefix and seek
|
||||||
|
final byte[] startTimeBytes
|
||||||
|
= GenericObjectMapper.writeReverseOrderedLong(startTimeMax);
|
||||||
|
entityPrefixKeyBuilder.add(startTimeBytes, true);
|
||||||
|
if (startId != null) {
|
||||||
|
entityPrefixKeyBuilder.add(startId.getId());
|
||||||
|
}
|
||||||
|
final byte[] startPrefixBytes
|
||||||
|
= entityPrefixKeyBuilder.getBytesForLookup();
|
||||||
|
internalDbIterator.seek(startPrefixBytes);
|
||||||
|
|
||||||
|
return new Iterator<V>() {
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
if (!internalDbIterator.hasNext()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Map.Entry<byte[], byte[]> nextEntry = internalDbIterator.peekNext();
|
||||||
|
if (LeveldbUtils.prefixMatches(
|
||||||
|
prefixBytes, prefixBytes.length, nextEntry.getKey())) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public V next() {
|
||||||
|
if (hasNext()) {
|
||||||
|
Map.Entry<byte[], byte[]> nextRaw = internalDbIterator.next();
|
||||||
|
try {
|
||||||
|
V result = getEntityForKey(nextRaw.getKey());
|
||||||
|
return result;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("GenericObjectMapper cannot read key from key "
|
||||||
|
+ nextRaw.getKey()
|
||||||
|
+ " into an object. Read aborted! ");
|
||||||
|
LOG.error(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We do not support remove operations within one iteration
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
LOG.error("LevelDB map adapter does not support iterate-and-remove"
|
||||||
|
+ " use cases. ");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private V getEntityForKey(byte[] key) throws IOException {
|
||||||
|
byte[] resultRaw = entityDb.get(key);
|
||||||
|
if (resultRaw == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
ObjectMapper entityMapper = new ObjectMapper();
|
||||||
|
return (V) entityMapper.readValue(resultRaw, TimelineEntity.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] getStartTimeKey(K entityId) {
|
||||||
|
LeveldbUtils.KeyBuilder startTimeKeyBuilder
|
||||||
|
= LeveldbUtils.KeyBuilder.newInstance();
|
||||||
|
startTimeKeyBuilder.add(TIME_INDEX_PREFIX).add(entityId.getId())
|
||||||
|
.add(entityId.getType());
|
||||||
|
return startTimeKeyBuilder.getBytes();
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] getEntityKey(K entityId, byte[] startTimeBytes) {
|
||||||
|
LeveldbUtils.KeyBuilder entityKeyBuilder
|
||||||
|
= LeveldbUtils.KeyBuilder.newInstance();
|
||||||
|
entityKeyBuilder.add(ENTITY_STORAGE_PREFIX).add(startTimeBytes, true)
|
||||||
|
.add(entityId.getId()).add(entityId.getType());
|
||||||
|
return entityKeyBuilder.getBytes();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -200,7 +200,7 @@ class PluginStoreTestUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
static TimelineDataManager getTdmWithMemStore(Configuration config) {
|
static TimelineDataManager getTdmWithMemStore(Configuration config) {
|
||||||
TimelineStore store = new MemoryTimelineStore();
|
TimelineStore store = new MemoryTimelineStore("MemoryStore.test");
|
||||||
TimelineDataManager tdm = getTdmWithStore(config, store);
|
TimelineDataManager tdm = getTdmWithStore(config, store);
|
||||||
return tdm;
|
return tdm;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.timeline;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class TestLevelDBCacheTimelineStore extends TimelineStoreTestUtils {
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
store = new LevelDBCacheTimelineStore("app1");
|
||||||
|
store.init(new YarnConfiguration());
|
||||||
|
store.start();
|
||||||
|
loadTestEntityData();
|
||||||
|
loadVerificationEntityData();
|
||||||
|
loadTestDomainData();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
store.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
public TimelineStore getTimelineStore() {
|
||||||
|
return store;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetSingleEntity() throws IOException {
|
||||||
|
super.testGetSingleEntity();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetEntities() throws IOException {
|
||||||
|
super.testGetEntities();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetEntitiesWithFromId() throws IOException {
|
||||||
|
super.testGetEntitiesWithFromId();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetEntitiesWithFromTs() throws IOException {
|
||||||
|
super.testGetEntitiesWithFromTs();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetEntitiesWithPrimaryFilters() throws IOException {
|
||||||
|
super.testGetEntitiesWithPrimaryFilters();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetEntitiesWithSecondaryFilters() throws IOException {
|
||||||
|
super.testGetEntitiesWithSecondaryFilters();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetEvents() throws IOException {
|
||||||
|
super.testGetEvents();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetDomain() throws IOException {
|
||||||
|
super.testGetDomain();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetDomains() throws IOException {
|
||||||
|
super.testGetDomains();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue