From 353518f54c81e1442b1e055f0987c406563f2d84 Mon Sep 17 00:00:00 2001 From: Xuan Date: Thu, 28 Jan 2016 14:24:22 -0800 Subject: [PATCH] YARN-4219. New levelDB cache storage for timeline v1.5. Contributed by Li Lu (cherry picked from commit 9fab22b36673e7f1a0bb629d2c07966ac2482e99) (cherry picked from commit 2f00d8d3a7e8b41d073c8ed0e6efcb12f8758c0e) --- hadoop-yarn-project/CHANGES.txt | 2 + .../hadoop/yarn/conf/YarnConfiguration.java | 7 + .../src/main/resources/yarn-default.xml | 8 + .../timeline/KeyValueBasedTimelineStore.java | 574 ++++++++++++++++++ .../server/timeline/MemoryTimelineStore.java | 519 ++-------------- .../timeline/TimelineStoreMapAdapter.java | 60 ++ .../server/timeline/util/LeveldbUtils.java | 7 + .../pom.xml | 4 + .../yarn/server/timeline/EntityCacheItem.java | 3 +- .../timeline/LevelDBCacheTimelineStore.java | 316 ++++++++++ .../server/timeline/PluginStoreTestUtils.java | 2 +- .../TestLevelDBCacheTimelineStore.java | 94 +++ 12 files changed, 1128 insertions(+), 468 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLevelDBCacheTimelineStore.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 72ad3a4b7df..a4a3f3f5925 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -214,6 +214,8 @@ Release 2.8.0 - UNRELEASED YARN-4265. Provide new timeline plugin storage to support fine-grained entity caching. (Li Lu and Jason Lowe via junping_du) + YARN-4219. New levelDB cache storage for timeline v1.5. (Li Lu via xgong) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 04f61da8a41..57bd1181119 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1646,6 +1646,13 @@ private static void addDeprecatedKeys() { DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC = "2000, 500"; + public static final String TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE = + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + + "leveldb-cache-read-cache-size"; + + public static final long + DEFAULT_TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE = 10 * 1024 * 1024; + public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS = TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs"; public static final long diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index fe308ec9e79..0add9889e4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2036,6 +2036,14 @@ 604800 + + yarn.timeline-service.entity-group-fs-store.leveldb-cache-read-cache-size + + Read cache size for the leveldb cache storage in ATS v1.5 plugin storage. + + 10485760 + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java new file mode 100644 index 00000000000..79e2bf29990 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java @@ -0,0 +1,574 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; + +import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID; + +/** + * Map based implementation of {@link TimelineStore}. A hash map + * implementation should be connected to this implementation through a + * {@link TimelineStoreMapAdapter}. + * + * The methods are synchronized to avoid concurrent modifications. + * + */ +@Private +@Unstable +abstract class KeyValueBasedTimelineStore + extends AbstractService implements TimelineStore { + + protected TimelineStoreMapAdapter entities; + protected TimelineStoreMapAdapter entityInsertTimes; + protected TimelineStoreMapAdapter domainById; + protected TimelineStoreMapAdapter> domainsByOwner; + + private boolean serviceStopped = false; + + private static final Log LOG + = LogFactory.getLog(KeyValueBasedTimelineStore.class); + + public KeyValueBasedTimelineStore() { + super(KeyValueBasedTimelineStore.class.getName()); + } + + public KeyValueBasedTimelineStore(String name) { + super(name); + } + + public synchronized boolean getServiceStopped() { + return serviceStopped; + } + + @Override + protected synchronized void serviceStop() throws Exception { + serviceStopped = true; + super.serviceStop(); + } + + @Override + public synchronized TimelineEntities getEntities(String entityType, Long limit, + Long windowStart, Long windowEnd, String fromId, Long fromTs, + NameValuePair primaryFilter, Collection secondaryFilters, + EnumSet fields, CheckAcl checkAcl) throws IOException { + if (getServiceStopped()) { + LOG.info("Service stopped, return null for the storage"); + return null; + } + if (limit == null) { + limit = DEFAULT_LIMIT; + } + if (windowStart == null) { + windowStart = Long.MIN_VALUE; + } + if (windowEnd == null) { + windowEnd = Long.MAX_VALUE; + } + if (fields == null) { + fields = EnumSet.allOf(Field.class); + } + + Iterator entityIterator = null; + if (fromId != null) { + TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId, + entityType)); + if (firstEntity == null) { + return new TimelineEntities(); + } else { + entityIterator = entities.valueSetIterator(firstEntity); + } + } + if (entityIterator == null) { + entityIterator = entities.valueSetIterator(); + } + + List entitiesSelected = new ArrayList(); + while (entityIterator.hasNext()) { + TimelineEntity entity = entityIterator.next(); + if (entitiesSelected.size() >= limit) { + break; + } + if (!entity.getEntityType().equals(entityType)) { + continue; + } + if (entity.getStartTime() <= windowStart) { + continue; + } + if (entity.getStartTime() > windowEnd) { + continue; + } + if (fromTs != null && entityInsertTimes.get(new EntityIdentifier( + entity.getEntityId(), entity.getEntityType())) > fromTs) { + continue; + } + if (primaryFilter != null && + !KeyValueBasedTimelineStoreUtils.matchPrimaryFilter( + entity.getPrimaryFilters(), primaryFilter)) { + continue; + } + if (secondaryFilters != null) { // AND logic + boolean flag = true; + for (NameValuePair secondaryFilter : secondaryFilters) { + if (secondaryFilter != null && !KeyValueBasedTimelineStoreUtils + .matchPrimaryFilter(entity.getPrimaryFilters(), secondaryFilter) + && !KeyValueBasedTimelineStoreUtils.matchFilter( + entity.getOtherInfo(), secondaryFilter)) { + flag = false; + break; + } + } + if (!flag) { + continue; + } + } + if (entity.getDomainId() == null) { + entity.setDomainId(DEFAULT_DOMAIN_ID); + } + if (checkAcl == null || checkAcl.check(entity)) { + entitiesSelected.add(entity); + } + } + List entitiesToReturn = new ArrayList(); + for (TimelineEntity entitySelected : entitiesSelected) { + entitiesToReturn.add(KeyValueBasedTimelineStoreUtils.maskFields( + entitySelected, fields)); + } + Collections.sort(entitiesToReturn); + TimelineEntities entitiesWrapper = new TimelineEntities(); + entitiesWrapper.setEntities(entitiesToReturn); + return entitiesWrapper; + } + + @Override + public synchronized TimelineEntity getEntity(String entityId, String entityType, + EnumSet fieldsToRetrieve) { + if (getServiceStopped()) { + LOG.info("Service stopped, return null for the storage"); + return null; + } + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.allOf(Field.class); + } + TimelineEntity + entity = entities.get(new EntityIdentifier(entityId, entityType)); + if (entity == null) { + return null; + } else { + return KeyValueBasedTimelineStoreUtils.maskFields( + entity, fieldsToRetrieve); + } + } + + @Override + public synchronized TimelineEvents getEntityTimelines(String entityType, + SortedSet entityIds, Long limit, Long windowStart, + Long windowEnd, + Set eventTypes) { + if (getServiceStopped()) { + LOG.info("Service stopped, return null for the storage"); + return null; + } + TimelineEvents allEvents = new TimelineEvents(); + if (entityIds == null) { + return allEvents; + } + if (limit == null) { + limit = DEFAULT_LIMIT; + } + if (windowStart == null) { + windowStart = Long.MIN_VALUE; + } + if (windowEnd == null) { + windowEnd = Long.MAX_VALUE; + } + for (String entityId : entityIds) { + EntityIdentifier entityID = new EntityIdentifier(entityId, entityType); + TimelineEntity entity = entities.get(entityID); + if (entity == null) { + continue; + } + EventsOfOneEntity events = new EventsOfOneEntity(); + events.setEntityId(entityId); + events.setEntityType(entityType); + for (TimelineEvent event : entity.getEvents()) { + if (events.getEvents().size() >= limit) { + break; + } + if (event.getTimestamp() <= windowStart) { + continue; + } + if (event.getTimestamp() > windowEnd) { + continue; + } + if (eventTypes != null && !eventTypes.contains(event.getEventType())) { + continue; + } + events.addEvent(event); + } + allEvents.addEvent(events); + } + return allEvents; + } + + @Override + public TimelineDomain getDomain(String domainId) + throws IOException { + if (getServiceStopped()) { + LOG.info("Service stopped, return null for the storage"); + return null; + } + TimelineDomain domain = domainById.get(domainId); + if (domain == null) { + return null; + } else { + return KeyValueBasedTimelineStoreUtils.createTimelineDomain( + domain.getId(), + domain.getDescription(), + domain.getOwner(), + domain.getReaders(), + domain.getWriters(), + domain.getCreatedTime(), + domain.getModifiedTime()); + } + } + + @Override + public TimelineDomains getDomains(String owner) + throws IOException { + if (getServiceStopped()) { + LOG.info("Service stopped, return null for the storage"); + return null; + } + List domains = new ArrayList(); + Set domainsOfOneOwner = domainsByOwner.get(owner); + if (domainsOfOneOwner == null) { + return new TimelineDomains(); + } + for (TimelineDomain domain : domainsByOwner.get(owner)) { + TimelineDomain domainToReturn = KeyValueBasedTimelineStoreUtils + .createTimelineDomain( + domain.getId(), + domain.getDescription(), + domain.getOwner(), + domain.getReaders(), + domain.getWriters(), + domain.getCreatedTime(), + domain.getModifiedTime()); + domains.add(domainToReturn); + } + Collections.sort(domains, new Comparator() { + @Override + public int compare( + TimelineDomain domain1, TimelineDomain domain2) { + int result = domain2.getCreatedTime().compareTo( + domain1.getCreatedTime()); + if (result == 0) { + return domain2.getModifiedTime().compareTo( + domain1.getModifiedTime()); + } else { + return result; + } + } + }); + TimelineDomains domainsToReturn = new TimelineDomains(); + domainsToReturn.addDomains(domains); + return domainsToReturn; + } + + @Override + public synchronized TimelinePutResponse put(TimelineEntities data) { + TimelinePutResponse response = new TimelinePutResponse(); + if (getServiceStopped()) { + LOG.info("Service stopped, return null for the storage"); + TimelinePutError error = new TimelinePutError(); + error.setErrorCode(TimelinePutError.IO_EXCEPTION); + response.addError(error); + return response; + } + for (TimelineEntity entity : data.getEntities()) { + EntityIdentifier entityId = + new EntityIdentifier(entity.getEntityId(), entity.getEntityType()); + // store entity info in memory + TimelineEntity existingEntity = entities.get(entityId); + boolean needsPut = false; + if (existingEntity == null) { + existingEntity = new TimelineEntity(); + existingEntity.setEntityId(entity.getEntityId()); + existingEntity.setEntityType(entity.getEntityType()); + existingEntity.setStartTime(entity.getStartTime()); + if (entity.getDomainId() == null || + entity.getDomainId().length() == 0) { + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entityId.getId()); + error.setEntityType(entityId.getType()); + error.setErrorCode(TimelinePutError.NO_DOMAIN); + response.addError(error); + continue; + } + existingEntity.setDomainId(entity.getDomainId()); + // insert a new entity to the storage, update insert time map + entityInsertTimes.put(entityId, System.currentTimeMillis()); + needsPut = true; + } + if (entity.getEvents() != null) { + if (existingEntity.getEvents() == null) { + existingEntity.setEvents(entity.getEvents()); + } else { + existingEntity.addEvents(entity.getEvents()); + } + Collections.sort(existingEntity.getEvents()); + needsPut = true; + } + // check startTime + if (existingEntity.getStartTime() == null) { + if (existingEntity.getEvents() == null + || existingEntity.getEvents().isEmpty()) { + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entityId.getId()); + error.setEntityType(entityId.getType()); + error.setErrorCode(TimelinePutError.NO_START_TIME); + response.addError(error); + entities.remove(entityId); + entityInsertTimes.remove(entityId); + continue; + } else { + Long min = Long.MAX_VALUE; + for (TimelineEvent e : entity.getEvents()) { + if (min > e.getTimestamp()) { + min = e.getTimestamp(); + } + } + existingEntity.setStartTime(min); + needsPut = true; + } + } + if (entity.getPrimaryFilters() != null) { + if (existingEntity.getPrimaryFilters() == null) { + existingEntity.setPrimaryFilters(new HashMap>()); + } + for (Entry> pf : + entity.getPrimaryFilters().entrySet()) { + for (Object pfo : pf.getValue()) { + existingEntity.addPrimaryFilter(pf.getKey(), + KeyValueBasedTimelineStoreUtils.compactNumber(pfo)); + needsPut = true; + } + } + } + if (entity.getOtherInfo() != null) { + if (existingEntity.getOtherInfo() == null) { + existingEntity.setOtherInfo(new HashMap()); + } + for (Entry info : entity.getOtherInfo().entrySet()) { + existingEntity.addOtherInfo(info.getKey(), + KeyValueBasedTimelineStoreUtils.compactNumber(info.getValue())); + needsPut = true; + } + } + if (needsPut) { + entities.put(entityId, existingEntity); + } + + // relate it to other entities + if (entity.getRelatedEntities() == null) { + continue; + } + for (Entry> partRelatedEntities : entity + .getRelatedEntities().entrySet()) { + if (partRelatedEntities == null) { + continue; + } + for (String idStr : partRelatedEntities.getValue()) { + EntityIdentifier relatedEntityId = + new EntityIdentifier(idStr, partRelatedEntities.getKey()); + TimelineEntity relatedEntity = entities.get(relatedEntityId); + if (relatedEntity != null) { + if (relatedEntity.getDomainId().equals( + existingEntity.getDomainId())) { + relatedEntity.addRelatedEntity( + existingEntity.getEntityType(), existingEntity.getEntityId()); + entities.put(relatedEntityId, relatedEntity); + } else { + // in this case the entity will be put, but the relation will be + // ignored + TimelinePutError error = new TimelinePutError(); + error.setEntityType(existingEntity.getEntityType()); + error.setEntityId(existingEntity.getEntityId()); + error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION); + response.addError(error); + } + } else { + relatedEntity = new TimelineEntity(); + relatedEntity.setEntityId(relatedEntityId.getId()); + relatedEntity.setEntityType(relatedEntityId.getType()); + relatedEntity.setStartTime(existingEntity.getStartTime()); + relatedEntity.addRelatedEntity(existingEntity.getEntityType(), + existingEntity.getEntityId()); + relatedEntity.setDomainId(existingEntity.getDomainId()); + entities.put(relatedEntityId, relatedEntity); + entityInsertTimes.put(relatedEntityId, System.currentTimeMillis()); + } + } + } + } + return response; + } + + public void put(TimelineDomain domain) throws IOException { + if (getServiceStopped()) { + LOG.info("Service stopped, return null for the storage"); + return; + } + TimelineDomain domainToReplace = + domainById.get(domain.getId()); + Long currentTimestamp = System.currentTimeMillis(); + TimelineDomain domainToStore + = KeyValueBasedTimelineStoreUtils.createTimelineDomain( + domain.getId(), domain.getDescription(), domain.getOwner(), + domain.getReaders(), domain.getWriters(), + (domainToReplace == null ? + currentTimestamp : domainToReplace.getCreatedTime()), + currentTimestamp); + domainById.put(domainToStore.getId(), domainToStore); + Set domainsByOneOwner = + domainsByOwner.get(domainToStore.getOwner()); + if (domainsByOneOwner == null) { + domainsByOneOwner = new HashSet(); + domainsByOwner.put(domainToStore.getOwner(), domainsByOneOwner); + } + if (domainToReplace != null) { + domainsByOneOwner.remove(domainToReplace); + } + domainsByOneOwner.add(domainToStore); + } + + private static class KeyValueBasedTimelineStoreUtils { + + static TimelineDomain createTimelineDomain( + String id, String description, String owner, + String readers, String writers, + Long createdTime, Long modifiedTime) { + TimelineDomain domainToStore = new TimelineDomain(); + domainToStore.setId(id); + domainToStore.setDescription(description); + domainToStore.setOwner(owner); + domainToStore.setReaders(readers); + domainToStore.setWriters(writers); + domainToStore.setCreatedTime(createdTime); + domainToStore.setModifiedTime(modifiedTime); + return domainToStore; + } + + static TimelineEntity maskFields( + TimelineEntity entity, EnumSet fields) { + // Conceal the fields that are not going to be exposed + TimelineEntity entityToReturn = new TimelineEntity(); + entityToReturn.setEntityId(entity.getEntityId()); + entityToReturn.setEntityType(entity.getEntityType()); + entityToReturn.setStartTime(entity.getStartTime()); + entityToReturn.setDomainId(entity.getDomainId()); + // Deep copy + if (fields.contains(Field.EVENTS)) { + entityToReturn.addEvents(entity.getEvents()); + } else if (fields.contains(Field.LAST_EVENT_ONLY)) { + entityToReturn.addEvent(entity.getEvents().get(0)); + } else { + entityToReturn.setEvents(null); + } + if (fields.contains(Field.RELATED_ENTITIES)) { + entityToReturn.addRelatedEntities(entity.getRelatedEntities()); + } else { + entityToReturn.setRelatedEntities(null); + } + if (fields.contains(Field.PRIMARY_FILTERS)) { + entityToReturn.addPrimaryFilters(entity.getPrimaryFilters()); + } else { + entityToReturn.setPrimaryFilters(null); + } + if (fields.contains(Field.OTHER_INFO)) { + entityToReturn.addOtherInfo(entity.getOtherInfo()); + } else { + entityToReturn.setOtherInfo(null); + } + return entityToReturn; + } + + static boolean matchFilter(Map tags, + NameValuePair filter) { + Object value = tags.get(filter.getName()); + if (value == null) { // doesn't have the filter + return false; + } else if (!value.equals(filter.getValue())) { // doesn't match the filter + return false; + } + return true; + } + + static boolean matchPrimaryFilter(Map> tags, + NameValuePair filter) { + Set value = tags.get(filter.getName()); + if (value == null) { // doesn't have the filter + return false; + } else { + return value.contains(filter.getValue()); + } + } + + static Object compactNumber(Object o) { + if (o instanceof Long) { + Long l = (Long) o; + if (l >= Integer.MIN_VALUE && l <= Integer.MAX_VALUE) { + return l.intValue(); + } + } + return o; + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java index 3489114233d..5c2db0004e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java @@ -18,38 +18,13 @@ package org.apache.hadoop.yarn.server.timeline; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.PriorityQueue; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; -import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; -import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl; -import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeSet; /** * In-memory implementation of {@link TimelineStore}. This @@ -62,448 +37,60 @@ */ @Private @Unstable -public class MemoryTimelineStore - extends AbstractService implements TimelineStore { +public class MemoryTimelineStore extends KeyValueBasedTimelineStore { - private Map entities = - new HashMap(); - private Map entityInsertTimes = - new HashMap(); - private Map domainsById = - new HashMap(); - private Map> domainsByOwner = - new HashMap>(); + static class HashMapStoreAdapter + implements TimelineStoreMapAdapter { + Map internalMap = new HashMap<>(); + + @Override + public V get(K key) { + return internalMap.get(key); + } + + @Override + public void put(K key, V value) { + internalMap.put(key, value); + } + + @Override + public void remove(K key) { + internalMap.remove(key); + } + + @Override + public Iterator + valueSetIterator() { + return new TreeSet<>(internalMap.values()).iterator(); + } + + @Override + @SuppressWarnings("unchecked") + public Iterator valueSetIterator(V minV) { + if (minV instanceof Comparable) { + TreeSet tempTreeSet = new TreeSet<>(); + for (V value : internalMap.values()) { + if (((Comparable) value).compareTo(minV) >= 0) { + tempTreeSet.add(value); + } + } + return tempTreeSet.iterator(); + } else { + return valueSetIterator(); + } + } + } public MemoryTimelineStore() { - super(MemoryTimelineStore.class.getName()); + this(MemoryTimelineStore.class.getName()); } - @Override - public synchronized TimelineEntities getEntities(String entityType, Long limit, - Long windowStart, Long windowEnd, String fromId, Long fromTs, - NameValuePair primaryFilter, Collection secondaryFilters, - EnumSet fields, CheckAcl checkAcl) throws IOException { - if (limit == null) { - limit = DEFAULT_LIMIT; - } - if (windowStart == null) { - windowStart = Long.MIN_VALUE; - } - if (windowEnd == null) { - windowEnd = Long.MAX_VALUE; - } - if (fields == null) { - fields = EnumSet.allOf(Field.class); - } - - Iterator entityIterator = null; - if (fromId != null) { - TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId, - entityType)); - if (firstEntity == null) { - return new TimelineEntities(); - } else { - entityIterator = new TreeSet(entities.values()) - .tailSet(firstEntity, true).iterator(); - } - } - if (entityIterator == null) { - entityIterator = new PriorityQueue(entities.values()) - .iterator(); - } - - List entitiesSelected = new ArrayList(); - while (entityIterator.hasNext()) { - TimelineEntity entity = entityIterator.next(); - if (entitiesSelected.size() >= limit) { - break; - } - if (!entity.getEntityType().equals(entityType)) { - continue; - } - if (entity.getStartTime() <= windowStart) { - continue; - } - if (entity.getStartTime() > windowEnd) { - continue; - } - if (fromTs != null && entityInsertTimes.get(new EntityIdentifier( - entity.getEntityId(), entity.getEntityType())) > fromTs) { - continue; - } - if (primaryFilter != null && - !matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) { - continue; - } - if (secondaryFilters != null) { // AND logic - boolean flag = true; - for (NameValuePair secondaryFilter : secondaryFilters) { - if (secondaryFilter != null && !matchPrimaryFilter( - entity.getPrimaryFilters(), secondaryFilter) && - !matchFilter(entity.getOtherInfo(), secondaryFilter)) { - flag = false; - break; - } - } - if (!flag) { - continue; - } - } - if (entity.getDomainId() == null) { - entity.setDomainId(DEFAULT_DOMAIN_ID); - } - if (checkAcl == null || checkAcl.check(entity)) { - entitiesSelected.add(entity); - } - } - List entitiesToReturn = new ArrayList(); - for (TimelineEntity entitySelected : entitiesSelected) { - entitiesToReturn.add(maskFields(entitySelected, fields)); - } - Collections.sort(entitiesToReturn); - TimelineEntities entitiesWrapper = new TimelineEntities(); - entitiesWrapper.setEntities(entitiesToReturn); - return entitiesWrapper; - } - - @Override - public synchronized TimelineEntity getEntity(String entityId, String entityType, - EnumSet fieldsToRetrieve) { - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.allOf(Field.class); - } - TimelineEntity entity = entities.get(new EntityIdentifier(entityId, entityType)); - if (entity == null) { - return null; - } else { - return maskFields(entity, fieldsToRetrieve); - } - } - - @Override - public synchronized TimelineEvents getEntityTimelines(String entityType, - SortedSet entityIds, Long limit, Long windowStart, - Long windowEnd, - Set eventTypes) { - TimelineEvents allEvents = new TimelineEvents(); - if (entityIds == null) { - return allEvents; - } - if (limit == null) { - limit = DEFAULT_LIMIT; - } - if (windowStart == null) { - windowStart = Long.MIN_VALUE; - } - if (windowEnd == null) { - windowEnd = Long.MAX_VALUE; - } - for (String entityId : entityIds) { - EntityIdentifier entityID = new EntityIdentifier(entityId, entityType); - TimelineEntity entity = entities.get(entityID); - if (entity == null) { - continue; - } - EventsOfOneEntity events = new EventsOfOneEntity(); - events.setEntityId(entityId); - events.setEntityType(entityType); - for (TimelineEvent event : entity.getEvents()) { - if (events.getEvents().size() >= limit) { - break; - } - if (event.getTimestamp() <= windowStart) { - continue; - } - if (event.getTimestamp() > windowEnd) { - continue; - } - if (eventTypes != null && !eventTypes.contains(event.getEventType())) { - continue; - } - events.addEvent(event); - } - allEvents.addEvent(events); - } - return allEvents; - } - - @Override - public TimelineDomain getDomain(String domainId) - throws IOException { - TimelineDomain domain = domainsById.get(domainId); - if (domain == null) { - return null; - } else { - return createTimelineDomain( - domain.getId(), - domain.getDescription(), - domain.getOwner(), - domain.getReaders(), - domain.getWriters(), - domain.getCreatedTime(), - domain.getModifiedTime()); - } - } - - @Override - public TimelineDomains getDomains(String owner) - throws IOException { - List domains = new ArrayList(); - Set domainsOfOneOwner = domainsByOwner.get(owner); - if (domainsOfOneOwner == null) { - return new TimelineDomains(); - } - for (TimelineDomain domain : domainsByOwner.get(owner)) { - TimelineDomain domainToReturn = createTimelineDomain( - domain.getId(), - domain.getDescription(), - domain.getOwner(), - domain.getReaders(), - domain.getWriters(), - domain.getCreatedTime(), - domain.getModifiedTime()); - domains.add(domainToReturn); - } - Collections.sort(domains, new Comparator() { - @Override - public int compare( - TimelineDomain domain1, TimelineDomain domain2) { - int result = domain2.getCreatedTime().compareTo( - domain1.getCreatedTime()); - if (result == 0) { - return domain2.getModifiedTime().compareTo( - domain1.getModifiedTime()); - } else { - return result; - } - } - }); - TimelineDomains domainsToReturn = new TimelineDomains(); - domainsToReturn.addDomains(domains); - return domainsToReturn; - } - - @Override - public synchronized TimelinePutResponse put(TimelineEntities data) { - TimelinePutResponse response = new TimelinePutResponse(); - for (TimelineEntity entity : data.getEntities()) { - EntityIdentifier entityId = - new EntityIdentifier(entity.getEntityId(), entity.getEntityType()); - // store entity info in memory - TimelineEntity existingEntity = entities.get(entityId); - if (existingEntity == null) { - existingEntity = new TimelineEntity(); - existingEntity.setEntityId(entity.getEntityId()); - existingEntity.setEntityType(entity.getEntityType()); - existingEntity.setStartTime(entity.getStartTime()); - if (entity.getDomainId() == null || - entity.getDomainId().length() == 0) { - TimelinePutError error = new TimelinePutError(); - error.setEntityId(entityId.getId()); - error.setEntityType(entityId.getType()); - error.setErrorCode(TimelinePutError.NO_DOMAIN); - response.addError(error); - continue; - } - existingEntity.setDomainId(entity.getDomainId()); - entities.put(entityId, existingEntity); - entityInsertTimes.put(entityId, System.currentTimeMillis()); - } - if (entity.getEvents() != null) { - if (existingEntity.getEvents() == null) { - existingEntity.setEvents(entity.getEvents()); - } else { - existingEntity.addEvents(entity.getEvents()); - } - Collections.sort(existingEntity.getEvents()); - } - // check startTime - if (existingEntity.getStartTime() == null) { - if (existingEntity.getEvents() == null - || existingEntity.getEvents().isEmpty()) { - TimelinePutError error = new TimelinePutError(); - error.setEntityId(entityId.getId()); - error.setEntityType(entityId.getType()); - error.setErrorCode(TimelinePutError.NO_START_TIME); - response.addError(error); - entities.remove(entityId); - entityInsertTimes.remove(entityId); - continue; - } else { - Long min = Long.MAX_VALUE; - for (TimelineEvent e : entity.getEvents()) { - if (min > e.getTimestamp()) { - min = e.getTimestamp(); - } - } - existingEntity.setStartTime(min); - } - } - if (entity.getPrimaryFilters() != null) { - if (existingEntity.getPrimaryFilters() == null) { - existingEntity.setPrimaryFilters(new HashMap>()); - } - for (Entry> pf : - entity.getPrimaryFilters().entrySet()) { - for (Object pfo : pf.getValue()) { - existingEntity.addPrimaryFilter(pf.getKey(), maybeConvert(pfo)); - } - } - } - if (entity.getOtherInfo() != null) { - if (existingEntity.getOtherInfo() == null) { - existingEntity.setOtherInfo(new HashMap()); - } - for (Entry info : entity.getOtherInfo().entrySet()) { - existingEntity.addOtherInfo(info.getKey(), - maybeConvert(info.getValue())); - } - } - // relate it to other entities - if (entity.getRelatedEntities() == null) { - continue; - } - for (Map.Entry> partRelatedEntities : entity - .getRelatedEntities().entrySet()) { - if (partRelatedEntities == null) { - continue; - } - for (String idStr : partRelatedEntities.getValue()) { - EntityIdentifier relatedEntityId = - new EntityIdentifier(idStr, partRelatedEntities.getKey()); - TimelineEntity relatedEntity = entities.get(relatedEntityId); - if (relatedEntity != null) { - if (relatedEntity.getDomainId().equals( - existingEntity.getDomainId())) { - relatedEntity.addRelatedEntity( - existingEntity.getEntityType(), existingEntity.getEntityId()); - } else { - // in this case the entity will be put, but the relation will be - // ignored - TimelinePutError error = new TimelinePutError(); - error.setEntityType(existingEntity.getEntityType()); - error.setEntityId(existingEntity.getEntityId()); - error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION); - response.addError(error); - } - } else { - relatedEntity = new TimelineEntity(); - relatedEntity.setEntityId(relatedEntityId.getId()); - relatedEntity.setEntityType(relatedEntityId.getType()); - relatedEntity.setStartTime(existingEntity.getStartTime()); - relatedEntity.addRelatedEntity(existingEntity.getEntityType(), - existingEntity.getEntityId()); - relatedEntity.setDomainId(existingEntity.getDomainId()); - entities.put(relatedEntityId, relatedEntity); - entityInsertTimes.put(relatedEntityId, System.currentTimeMillis()); - } - } - } - } - return response; - } - - public void put(TimelineDomain domain) throws IOException { - TimelineDomain domainToReplace = - domainsById.get(domain.getId()); - Long currentTimestamp = System.currentTimeMillis(); - TimelineDomain domainToStore = createTimelineDomain( - domain.getId(), domain.getDescription(), domain.getOwner(), - domain.getReaders(), domain.getWriters(), - (domainToReplace == null ? - currentTimestamp : domainToReplace.getCreatedTime()), - currentTimestamp); - domainsById.put(domainToStore.getId(), domainToStore); - Set domainsByOneOwner = - domainsByOwner.get(domainToStore.getOwner()); - if (domainsByOneOwner == null) { - domainsByOneOwner = new HashSet(); - domainsByOwner.put(domainToStore.getOwner(), domainsByOneOwner); - } - if (domainToReplace != null) { - domainsByOneOwner.remove(domainToReplace); - } - domainsByOneOwner.add(domainToStore); - } - - private static TimelineDomain createTimelineDomain( - String id, String description, String owner, - String readers, String writers, - Long createdTime, Long modifiedTime) { - TimelineDomain domainToStore = new TimelineDomain(); - domainToStore.setId(id); - domainToStore.setDescription(description); - domainToStore.setOwner(owner); - domainToStore.setReaders(readers); - domainToStore.setWriters(writers); - domainToStore.setCreatedTime(createdTime); - domainToStore.setModifiedTime(modifiedTime); - return domainToStore; - } - - private static TimelineEntity maskFields( - TimelineEntity entity, EnumSet fields) { - // Conceal the fields that are not going to be exposed - TimelineEntity entityToReturn = new TimelineEntity(); - entityToReturn.setEntityId(entity.getEntityId()); - entityToReturn.setEntityType(entity.getEntityType()); - entityToReturn.setStartTime(entity.getStartTime()); - entityToReturn.setDomainId(entity.getDomainId()); - // Deep copy - if (fields.contains(Field.EVENTS)) { - entityToReturn.addEvents(entity.getEvents()); - } else if (fields.contains(Field.LAST_EVENT_ONLY)) { - entityToReturn.addEvent(entity.getEvents().get(0)); - } else { - entityToReturn.setEvents(null); - } - if (fields.contains(Field.RELATED_ENTITIES)) { - entityToReturn.addRelatedEntities(entity.getRelatedEntities()); - } else { - entityToReturn.setRelatedEntities(null); - } - if (fields.contains(Field.PRIMARY_FILTERS)) { - entityToReturn.addPrimaryFilters(entity.getPrimaryFilters()); - } else { - entityToReturn.setPrimaryFilters(null); - } - if (fields.contains(Field.OTHER_INFO)) { - entityToReturn.addOtherInfo(entity.getOtherInfo()); - } else { - entityToReturn.setOtherInfo(null); - } - return entityToReturn; - } - - private static boolean matchFilter(Map tags, - NameValuePair filter) { - Object value = tags.get(filter.getName()); - if (value == null) { // doesn't have the filter - return false; - } else if (!value.equals(filter.getValue())) { // doesn't match the filter - return false; - } - return true; - } - - private static boolean matchPrimaryFilter(Map> tags, - NameValuePair filter) { - Set value = tags.get(filter.getName()); - if (value == null) { // doesn't have the filter - return false; - } else { - return value.contains(filter.getValue()); - } - } - - private static Object maybeConvert(Object o) { - if (o instanceof Long) { - Long l = (Long)o; - if (l >= Integer.MIN_VALUE && l <= Integer.MAX_VALUE) { - return l.intValue(); - } - } - return o; + public MemoryTimelineStore(String name) { + super(name); + entities = new HashMapStoreAdapter<>(); + entityInsertTimes = new HashMapStoreAdapter<>(); + domainById = new HashMapStoreAdapter<>(); + domainsByOwner = new HashMapStoreAdapter<>(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java new file mode 100644 index 00000000000..175ed0b4457 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import java.util.Iterator; + +/** + * An adapter for map timeline store implementations + * @param the type of the key set + * @param the type of the value set + */ +interface TimelineStoreMapAdapter { + /** + * @param key + * @return map(key) + */ + V get(K key); + + /** + * Add mapping key->value in the map + * @param key + * @param value + */ + void put(K key, V value); + + /** + * Remove mapping with key keyToRemove + * @param keyToRemove + */ + void remove(K keyToRemove); + + /** + * @return the iterator of the value set of the map + */ + Iterator valueSetIterator(); + + /** + * Return the iterator of the value set of the map, starting from minV if type + * V is comparable. + * @param minV + * @return + */ + Iterator valueSetIterator(V minV); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java index 56385812e06..82c7f265e76 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timeline.util; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.WritableComparator; import java.io.IOException; @@ -177,4 +178,10 @@ public static boolean prefixMatches(byte[] prefix, int prefixlen, prefixlen) == 0; } + /** + * Default permission mask for the level db dir + */ + public static final FsPermission LEVELDB_DIR_UMASK = FsPermission + .createImmutable((short) 0700); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml index 4fc08b0ce7b..bc689e94a87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml @@ -132,5 +132,9 @@ mockito-all test + + com.fasterxml.jackson.core + jackson-databind + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java index 37a1d8dad88..efbf994bbb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java @@ -102,7 +102,8 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId, } if (!appLogs.getDetailLogs().isEmpty()) { if (store == null) { - store = new MemoryTimelineStore(); + store = new LevelDBCacheTimelineStore(groupId.toString(), + "LeveldbCache." + groupId); store.init(config); store.start(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java new file mode 100644 index 00000000000..976241f37db --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils; +import org.fusesource.leveldbjni.JniDBFactory; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBIterator; +import org.iq80.leveldb.Options; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +/** + * LevelDB implementation of {@link KeyValueBasedTimelineStore}. This + * implementation stores the entity hash map into a LevelDB instance. + * There are two partitions of the key space. One partition is to store a + * entity id to start time mapping: + * + * i!ENTITY_ID!ENTITY_TYPE -> ENTITY_START_TIME + * + * The other partition is to store the actual data: + * + * e!START_TIME!ENTITY_ID!ENTITY_TYPE -> ENTITY_BYTES + * + * This storage does not have any garbage collection mechanism, and is designed + * mainly for caching usages. + */ +@Private +@Unstable +public class LevelDBCacheTimelineStore extends KeyValueBasedTimelineStore { + private static final Log LOG + = LogFactory.getLog(LevelDBCacheTimelineStore.class); + private static final String CACHED_LDB_FILE_PREFIX = "-timeline-cache.ldb"; + private String dbId; + private DB entityDb; + private Configuration configuration; + + public LevelDBCacheTimelineStore(String id, String name) { + super(name); + dbId = id; + entityInsertTimes = new MemoryTimelineStore.HashMapStoreAdapter<>(); + domainById = new MemoryTimelineStore.HashMapStoreAdapter<>(); + domainsByOwner = new MemoryTimelineStore.HashMapStoreAdapter<>(); + } + + public LevelDBCacheTimelineStore(String id) { + this(id, LevelDBCacheTimelineStore.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + configuration = conf; + Options options = new Options(); + options.createIfMissing(true); + options.cacheSize(conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE, + YarnConfiguration. + DEFAULT_TIMELINE_SERVICE_LEVELDB_CACHE_READ_CACHE_SIZE)); + JniDBFactory factory = new JniDBFactory(); + Path dbPath = new Path( + conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH), + dbId + CACHED_LDB_FILE_PREFIX); + FileSystem localFS = null; + + try { + localFS = FileSystem.getLocal(conf); + if (!localFS.exists(dbPath)) { + if (!localFS.mkdirs(dbPath)) { + throw new IOException("Couldn't create directory for leveldb " + + "timeline store " + dbPath); + } + localFS.setPermission(dbPath, LeveldbUtils.LEVELDB_DIR_UMASK); + } + } finally { + IOUtils.cleanup(LOG, localFS); + } + LOG.info("Using leveldb path " + dbPath); + entityDb = factory.open(new File(dbPath.toString()), options); + entities = new LevelDBMapAdapter<>(entityDb); + + super.serviceInit(conf); + } + + @Override + protected synchronized void serviceStop() throws Exception { + IOUtils.cleanup(LOG, entityDb); + Path dbPath = new Path( + configuration.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH), + dbId + CACHED_LDB_FILE_PREFIX); + FileSystem localFS = null; + try { + localFS = FileSystem.getLocal(configuration); + if (!localFS.delete(dbPath, true)) { + throw new IOException("Couldn't delete data file for leveldb " + + "timeline store " + dbPath); + } + } finally { + IOUtils.cleanup(LOG, localFS); + } + super.serviceStop(); + } + + /** + * A specialized hash map storage that uses LevelDB for storing entity id to + * entity mappings. + * + * @param an {@link EntityIdentifier} typed hash key + * @param a {@link TimelineEntity} typed value + */ + static class LevelDBMapAdapter implements TimelineStoreMapAdapter { + private static final String TIME_INDEX_PREFIX = "i"; + private static final String ENTITY_STORAGE_PREFIX = "e"; + DB entityDb; + + public LevelDBMapAdapter(DB currLevelDb) { + entityDb = currLevelDb; + } + + @Override + public V get(K entityId) { + V result = null; + // Read the start time from the index + byte[] startTimeBytes = entityDb.get(getStartTimeKey(entityId)); + if (startTimeBytes == null) { + return null; + } + + // Build the key for the entity storage and read it + try { + result = getEntityForKey(getEntityKey(entityId, startTimeBytes)); + } catch (IOException e) { + LOG.error("GenericObjectMapper cannot read key from key " + + entityId.toString() + + " into an object. Read aborted! "); + LOG.error(e.getMessage()); + } + + return result; + } + + @Override + public void put(K entityId, V entity) { + Long startTime = entity.getStartTime(); + if (startTime == null) { + startTime = System.currentTimeMillis(); + } + // Build the key for the entity storage and read it + byte[] startTimeBytes = GenericObjectMapper.writeReverseOrderedLong( + startTime); + try { + byte[] valueBytes = GenericObjectMapper.write(entity); + entityDb.put(getEntityKey(entityId, startTimeBytes), valueBytes); + } catch (IOException e) { + LOG.error("GenericObjectMapper cannot write " + + entity.getClass().getName() + + " into a byte array. Write aborted! "); + LOG.error(e.getMessage()); + } + + // Build the key for the start time index + entityDb.put(getStartTimeKey(entityId), startTimeBytes); + } + + @Override + public void remove(K entityId) { + // Read the start time from the index (key starts with an "i") then delete + // the record + LeveldbUtils.KeyBuilder startTimeKeyBuilder + = LeveldbUtils.KeyBuilder.newInstance(); + startTimeKeyBuilder.add(TIME_INDEX_PREFIX).add(entityId.getId()) + .add(entityId.getType()); + byte[] startTimeBytes = entityDb.get(startTimeKeyBuilder.getBytes()); + if (startTimeBytes == null) { + return; + } + entityDb.delete(startTimeKeyBuilder.getBytes()); + + // Build the key for the entity storage and delete it + entityDb.delete(getEntityKey(entityId, startTimeBytes)); + } + + @Override + public Iterator valueSetIterator() { + return getIterator(null, Long.MAX_VALUE); + } + + @Override + public Iterator valueSetIterator(V minV) { + return getIterator( + new EntityIdentifier(minV.getEntityId(), minV.getEntityType()), + minV.getStartTime()); + } + + private Iterator getIterator( + EntityIdentifier startId, long startTimeMax) { + + final DBIterator internalDbIterator = entityDb.iterator(); + + // we need to iterate from the first element with key greater than or + // equal to ENTITY_STORAGE_PREFIX!maxTS(!startId), but stop on the first + // key who does not have prefix ENTITY_STORATE_PREFIX + + // decide end prefix + LeveldbUtils.KeyBuilder entityPrefixKeyBuilder + = LeveldbUtils.KeyBuilder.newInstance(); + entityPrefixKeyBuilder.add(ENTITY_STORAGE_PREFIX); + final byte[] prefixBytes = entityPrefixKeyBuilder.getBytesForLookup(); + // decide start prefix on top of end prefix and seek + final byte[] startTimeBytes + = GenericObjectMapper.writeReverseOrderedLong(startTimeMax); + entityPrefixKeyBuilder.add(startTimeBytes, true); + if (startId != null) { + entityPrefixKeyBuilder.add(startId.getId()); + } + final byte[] startPrefixBytes + = entityPrefixKeyBuilder.getBytesForLookup(); + internalDbIterator.seek(startPrefixBytes); + + return new Iterator() { + @Override + public boolean hasNext() { + if (!internalDbIterator.hasNext()) { + return false; + } + Map.Entry nextEntry = internalDbIterator.peekNext(); + if (LeveldbUtils.prefixMatches( + prefixBytes, prefixBytes.length, nextEntry.getKey())) { + return true; + } + return false; + } + + @Override + public V next() { + if (hasNext()) { + Map.Entry nextRaw = internalDbIterator.next(); + try { + V result = getEntityForKey(nextRaw.getKey()); + return result; + } catch (IOException e) { + LOG.error("GenericObjectMapper cannot read key from key " + + nextRaw.getKey() + + " into an object. Read aborted! "); + LOG.error(e.getMessage()); + } + } + return null; + } + + // We do not support remove operations within one iteration + @Override + public void remove() { + LOG.error("LevelDB map adapter does not support iterate-and-remove" + + " use cases. "); + } + }; + } + + @SuppressWarnings("unchecked") + private V getEntityForKey(byte[] key) throws IOException { + byte[] resultRaw = entityDb.get(key); + if (resultRaw == null) { + return null; + } + ObjectMapper entityMapper = new ObjectMapper(); + return (V) entityMapper.readValue(resultRaw, TimelineEntity.class); + } + + private byte[] getStartTimeKey(K entityId) { + LeveldbUtils.KeyBuilder startTimeKeyBuilder + = LeveldbUtils.KeyBuilder.newInstance(); + startTimeKeyBuilder.add(TIME_INDEX_PREFIX).add(entityId.getId()) + .add(entityId.getType()); + return startTimeKeyBuilder.getBytes(); + } + + private byte[] getEntityKey(K entityId, byte[] startTimeBytes) { + LeveldbUtils.KeyBuilder entityKeyBuilder + = LeveldbUtils.KeyBuilder.newInstance(); + entityKeyBuilder.add(ENTITY_STORAGE_PREFIX).add(startTimeBytes, true) + .add(entityId.getId()).add(entityId.getType()); + return entityKeyBuilder.getBytes(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java index e0379b1a811..f529b59fd6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java @@ -200,7 +200,7 @@ static TimelineDataManager getTdmWithStore(Configuration config, TimelineStore s } static TimelineDataManager getTdmWithMemStore(Configuration config) { - TimelineStore store = new MemoryTimelineStore(); + TimelineStore store = new MemoryTimelineStore("MemoryStore.test"); TimelineDataManager tdm = getTdmWithStore(config, store); return tdm; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLevelDBCacheTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLevelDBCacheTimelineStore.java new file mode 100644 index 00000000000..66da1e0e274 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLevelDBCacheTimelineStore.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +public class TestLevelDBCacheTimelineStore extends TimelineStoreTestUtils { + + @Before + public void setup() throws Exception { + store = new LevelDBCacheTimelineStore("app1"); + store.init(new YarnConfiguration()); + store.start(); + loadTestEntityData(); + loadVerificationEntityData(); + loadTestDomainData(); + } + + @After + public void tearDown() throws Exception { + store.stop(); + } + + public TimelineStore getTimelineStore() { + return store; + } + + @Test + public void testGetSingleEntity() throws IOException { + super.testGetSingleEntity(); + } + + @Test + public void testGetEntities() throws IOException { + super.testGetEntities(); + } + + @Test + public void testGetEntitiesWithFromId() throws IOException { + super.testGetEntitiesWithFromId(); + } + + @Test + public void testGetEntitiesWithFromTs() throws IOException { + super.testGetEntitiesWithFromTs(); + } + + @Test + public void testGetEntitiesWithPrimaryFilters() throws IOException { + super.testGetEntitiesWithPrimaryFilters(); + } + + @Test + public void testGetEntitiesWithSecondaryFilters() throws IOException { + super.testGetEntitiesWithSecondaryFilters(); + } + + @Test + public void testGetEvents() throws IOException { + super.testGetEvents(); + } + + @Test + public void testGetDomain() throws IOException { + super.testGetDomain(); + } + + @Test + public void testGetDomains() throws IOException { + super.testGetDomains(); + } + +}