YARN-4265. Provide new timeline plugin storage to support fine-grained entity caching. Contributed by Li Lu and Jason Lowe

(cherry picked from commit 02f597c5db)
(cherry picked from commit 4a30a44b11)
This commit is contained in:
Junping Du 2016-01-17 17:37:40 -08:00
parent e75ca52138
commit a04f29ca90
22 changed files with 2574 additions and 10 deletions

View File

@ -211,6 +211,9 @@ Release 2.8.0 - UNRELEASED
YARN-4234. New put APIs in TimelineClient for ats v1.5. (Xuan Gong via
junping_du)
YARN-4265. Provide new timeline plugin storage to support fine-grained entity
caching. (Li Lu and Jason Lowe via junping_du)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -1546,6 +1546,7 @@ private static void addDeprecatedKeys() {
public static final String TIMELINE_SERVICE_VERSION = TIMELINE_SERVICE_PREFIX
+ "version";
public static final float DEFAULT_TIMELINE_SERVICE_VERSION = 1.0f;
/**
* Comma seperated list of names for UIs hosted in the timeline server
* (For pluggable UIs).
@ -1581,6 +1582,63 @@ private static void addDeprecatedKeys() {
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT =
"/tmp/entity-file-history/active";
public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "done-dir";
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT =
"/tmp/entity-file-history/done";
public static final String TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "group-id-plugin-classes";
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_STORE =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-store";
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-entity-types";
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "scan-interval-seconds";
public static final long
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT = 60;
public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "threads";
public static final int
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT = 16;
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE
= TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "app-cache-size";
public static final int
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE_DEFAULT = 10;
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "cleaner-interval-seconds";
public static final int
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS_DEFAULT =
60 * 60;
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS
= TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retain-seconds";
public static final int
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS_DEFAULT =
7 * 24 * 60 * 60;
// how old the most recent log of an UNKNOWN app needs to be in the active
// directory before we treat it as COMPLETED
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "unknown-active-seconds";
public static final int
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS_DEFAULT
= 24 * 60 * 60;
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy-spec";
@ -1588,10 +1646,6 @@ private static void addDeprecatedKeys() {
DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
"2000, 500";
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-entity-types";
public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS =
TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs";
public static final long

View File

@ -1978,6 +1978,64 @@
<value>${hadoop.tmp.dir}/yarn/timeline</value>
</property>
<!-- Timeline Service v1.5 Configuration -->
<property>
<name>yarn.timeline-service.entity-group-fs-store.active-dir</name>
<value>/tmp/entity-file-history/active</value>
<description>HDFS path to store active applications timeline data</description>
</property>
<property>
<name>yarn.timeline-service.entity-group-fs-store.done-dir</name>
<value>/tmp/entity-file-history/done/</value>
<description>HDFS path to store done applications timeline data</description>
</property>
<property>
<name>yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes</name>
<value></value>
<description>
Plugins that can translate a timeline entity read request into
a list of timeline entity group ids, separated by commas.
</description>
</property>
<property>
<name>yarn.timeline-service.entity-group-fs-store.summary-store</name>
<description>Summary storage for ATS v1.5</description>
<value>org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore</value>
</property>
<property>
<name>yarn.timeline-service.entity-group-fs-store.scan-interval-seconds</name>
<description>
Scan interval for ATS v1.5 entity group file system storage reader.This
value controls how frequent the reader will scan the HDFS active directory
for application status.
</description>
<value>60</value>
</property>
<property>
<name>yarn.timeline-service.entity-group-fs-store.cleaner-interval-seconds</name>
<description>
Scan interval for ATS v1.5 entity group file system storage cleaner.This
value controls how frequent the reader will scan the HDFS done directory
for stale application data.
</description>
<value>3600</value>
</property>
<property>
<name>yarn.timeline-service.entity-group-fs-store.retain-seconds</name>
<description>
How long the ATS v1.5 entity group file system storage will keep an
application's data in the done directory.
</description>
<value>604800</value>
</property>
<!-- Shared Cache Configuration -->
<property>

View File

@ -220,6 +220,17 @@
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
<phase>test-compile</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -233,8 +233,9 @@ private TimelineStore createTimelineStore(
}
private TimelineDataManager createTimelineDataManager(Configuration conf) {
return new TimelineDataManager(
timelineStore, new TimelineACLsManager(conf));
TimelineACLsManager aclsMgr = new TimelineACLsManager(conf);
aclsMgr.setTimelineStore(timelineStore);
return new TimelineDataManager(timelineStore, aclsMgr);
}
@SuppressWarnings("unchecked")

View File

@ -67,7 +67,6 @@ public TimelineDataManager(TimelineStore store,
super(TimelineDataManager.class.getName());
this.store = store;
this.timelineACLsManager = timelineACLsManager;
timelineACLsManager.setTimelineStore(store);
}
@Override

View File

@ -92,12 +92,17 @@ public long totalOps() {
getDomainsOps.value();
}
private static TimelineDataManagerMetrics instance = null;
TimelineDataManagerMetrics() {
}
public static TimelineDataManagerMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
return ms.register(new TimelineDataManagerMetrics());
public static synchronized TimelineDataManagerMetrics create() {
if (instance == null) {
MetricsSystem ms = DefaultMetricsSystem.instance();
instance = ms.register(new TimelineDataManagerMetrics());
}
return instance;
}
public void incrGetEntitiesOps() {

View File

@ -65,6 +65,7 @@ public static void setup() throws Exception {
TimelineStore store =
TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS);
TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
aclsManager.setTimelineStore(store);
dataManager =
new TimelineDataManager(store, aclsManager);
dataManager.init(conf);

View File

@ -96,6 +96,7 @@ public static TimelineStore createStore(int scale) throws Exception {
public void setup() throws Exception {
// Only test the ACLs of the generic history
TimelineACLsManager aclsManager = new TimelineACLsManager(new YarnConfiguration());
aclsManager.setTimelineStore(store);
TimelineDataManager dataManager =
new TimelineDataManager(store, aclsManager);
dataManager.init(conf);

View File

@ -90,6 +90,7 @@ public static void setupClass() throws Exception {
TimelineStore store =
TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS);
TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
aclsManager.setTimelineStore(store);
TimelineDataManager dataManager =
new TimelineDataManager(store, aclsManager);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);

View File

@ -62,6 +62,7 @@ public void setup() throws Exception {
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
aclsManager = new TimelineACLsManager(conf);
aclsManager.setTimelineStore(store);
dataManaer = new TimelineDataManager(store, aclsManager);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");

View File

@ -0,0 +1,136 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hadoop-yarn-server</artifactId>
<groupId>org.apache.hadoop</groupId>
<version>3.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timeline-pluginstorage</artifactId>
<version>3.0.0-SNAPSHOT</version>
<name>Apache Hadoop YARN Timeline Plugin Storage</name>
<properties>
<!-- Needed for generating FindBugs warnings using parent pom -->
<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
</properties>
<dependencies>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>commons-el</groupId>
<artifactId>commons-el</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1-jetty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,170 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Cache item for timeline server v1.5 reader cache. Each cache item has a
* TimelineStore that can be filled with data within one entity group.
*/
public class EntityCacheItem {
private static final Logger LOG
= LoggerFactory.getLogger(EntityCacheItem.class);
private TimelineStore store;
private EntityGroupFSTimelineStore.AppLogs appLogs;
private long lastRefresh;
private Configuration config;
private FileSystem fs;
public EntityCacheItem(Configuration config, FileSystem fs) {
this.config = config;
this.fs = fs;
}
/**
* @return The application log associated to this cache item, may be null.
*/
public synchronized EntityGroupFSTimelineStore.AppLogs getAppLogs() {
return this.appLogs;
}
/**
* Set the application logs to this cache item. The entity group should be
* associated with this application.
*
* @param incomingAppLogs
*/
public synchronized void setAppLogs(
EntityGroupFSTimelineStore.AppLogs incomingAppLogs) {
this.appLogs = incomingAppLogs;
}
/**
* @return The timeline store, either loaded or unloaded, of this cache item.
*/
public synchronized TimelineStore getStore() {
return store;
}
/**
* Refresh this cache item if it needs refresh. This will enforce an appLogs
* rescan and then load new data. The refresh process is synchronized with
* other operations on the same cache item.
*
* @param groupId
* @param aclManager
* @param jsonFactory
* @param objMapper
* @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore}
* object filled with all entities in the group.
* @throws IOException
*/
public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
TimelineACLsManager aclManager, JsonFactory jsonFactory,
ObjectMapper objMapper) throws IOException {
if (needRefresh()) {
// If an application is not finished, we only update summary logs (and put
// new entities into summary storage).
// Otherwise, since the application is done, we can update detail logs.
if (!appLogs.isDone()) {
appLogs.parseSummaryLogs();
} else if (appLogs.getDetailLogs().isEmpty()) {
appLogs.scanForLogs();
}
if (!appLogs.getDetailLogs().isEmpty()) {
if (store == null) {
store = new MemoryTimelineStore();
store.init(config);
store.start();
}
TimelineDataManager tdm = new TimelineDataManager(store,
aclManager);
tdm.init(config);
tdm.start();
List<LogInfo> removeList = new ArrayList<LogInfo>();
for (LogInfo log : appLogs.getDetailLogs()) {
LOG.debug("Try refresh logs for {}", log.getFilename());
// Only refresh the log that matches the cache id
if (log.matchesGroupId(groupId)) {
Path appDirPath = appLogs.getAppDirPath();
if (fs.exists(log.getPath(appDirPath))) {
LOG.debug("Refresh logs for cache id {}", groupId);
log.parseForStore(tdm, appDirPath, appLogs.isDone(), jsonFactory,
objMapper, fs);
} else {
// The log may have been removed, remove the log
removeList.add(log);
LOG.info("File {} no longer exists, remove it from log list",
log.getPath(appDirPath));
}
}
}
appLogs.getDetailLogs().removeAll(removeList);
tdm.close();
}
updateRefreshTimeToNow();
} else {
LOG.debug("Cache new enough, skip refreshing");
}
return store;
}
/**
* Release the cache item for the given group id.
*
* @param groupId
*/
public synchronized void releaseCache(TimelineEntityGroupId groupId) {
try {
if (store != null) {
store.close();
}
} catch (IOException e) {
LOG.warn("Error closing timeline store", e);
}
store = null;
// reset offsets so next time logs are re-parsed
for (LogInfo log : appLogs.getDetailLogs()) {
if (log.getFilename().contains(groupId.toString())) {
log.setOffset(0);
}
}
}
private boolean needRefresh() {
return (Time.monotonicNow() - lastRefresh > 10000);
}
private void updateRefreshTimeToNow() {
this.lastRefresh = Time.monotonicNow();
}
}

View File

@ -0,0 +1,895 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.map.MappingJsonFactory;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Plugin timeline storage to support timeline server v1.5 API. This storage
* uses a file system to store timeline entities in their groups.
*/
public class EntityGroupFSTimelineStore extends AbstractService
implements TimelineStore {
static final String DOMAIN_LOG_PREFIX = "domainlog-";
static final String SUMMARY_LOG_PREFIX = "summarylog-";
static final String ENTITY_LOG_PREFIX = "entitylog-";
private static final Logger LOG = LoggerFactory.getLogger(
EntityGroupFSTimelineStore.class);
private static final FsPermission ACTIVE_DIR_PERMISSION =
new FsPermission((short) 01777);
private static final FsPermission DONE_DIR_PERMISSION =
new FsPermission((short) 0700);
private static final EnumSet<YarnApplicationState>
APP_FINAL_STATES = EnumSet.of(
YarnApplicationState.FAILED,
YarnApplicationState.KILLED,
YarnApplicationState.FINISHED);
// Active dir: <activeRoot>/appId/attemptId/cacheId.log
// Done dir: <doneRoot>/cluster_ts/hash1/hash2/appId/attemptId/cacheId.log
private static final String APP_DONE_DIR_PREFIX_FORMAT =
"%d" + Path.SEPARATOR // cluster timestamp
+ "%04d" + Path.SEPARATOR // app num / 1,000,000
+ "%03d" + Path.SEPARATOR // (app num / 1000) % 1000
+ "%s" + Path.SEPARATOR; // full app id
private YarnClient yarnClient;
private TimelineStore summaryStore;
private TimelineACLsManager aclManager;
private TimelineDataManager summaryTdm;
private ConcurrentMap<ApplicationId, AppLogs> appIdLogMap =
new ConcurrentHashMap<ApplicationId, AppLogs>();
private ScheduledThreadPoolExecutor executor;
private FileSystem fs;
private ObjectMapper objMapper;
private JsonFactory jsonFactory;
private Path activeRootPath;
private Path doneRootPath;
private long logRetainMillis;
private long unknownActiveMillis;
private int appCacheMaxSize = 0;
private List<TimelineEntityGroupPlugin> cacheIdPlugins;
private Map<TimelineEntityGroupId, EntityCacheItem> cachedLogs;
public EntityGroupFSTimelineStore() {
super(EntityGroupFSTimelineStore.class.getSimpleName());
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
summaryStore = createSummaryStore();
summaryStore.init(conf);
long logRetainSecs = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS_DEFAULT);
logRetainMillis = logRetainSecs * 1000;
LOG.info("Cleaner set to delete logs older than {} seconds", logRetainSecs);
long unknownActiveSecs = conf.getLong(
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS,
YarnConfiguration.
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS_DEFAULT
);
unknownActiveMillis = unknownActiveSecs * 1000;
LOG.info("Unknown apps will be treated as complete after {} seconds",
unknownActiveSecs);
appCacheMaxSize = conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE_DEFAULT);
LOG.info("Application cache size is {}", appCacheMaxSize);
cachedLogs = Collections.synchronizedMap(
new LinkedHashMap<TimelineEntityGroupId, EntityCacheItem>(
appCacheMaxSize + 1, 0.75f, true) {
@Override
protected boolean removeEldestEntry(
Map.Entry<TimelineEntityGroupId, EntityCacheItem> eldest) {
if (super.size() > appCacheMaxSize) {
TimelineEntityGroupId groupId = eldest.getKey();
LOG.debug("Evicting {} due to space limitations", groupId);
EntityCacheItem cacheItem = eldest.getValue();
cacheItem.releaseCache(groupId);
if (cacheItem.getAppLogs().isDone()) {
appIdLogMap.remove(groupId.getApplicationId());
}
return true;
}
return false;
}
});
cacheIdPlugins = loadPlugIns(conf);
// Initialize yarn client for application status
yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
super.serviceInit(conf);
}
private List<TimelineEntityGroupPlugin> loadPlugIns(Configuration conf)
throws RuntimeException {
Collection<String> pluginNames = conf.getStringCollection(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES);
List<TimelineEntityGroupPlugin> pluginList
= new LinkedList<TimelineEntityGroupPlugin>();
for (final String name : pluginNames) {
LOG.debug("Trying to load plugin class {}", name);
TimelineEntityGroupPlugin cacheIdPlugin = null;
try {
Class<?> clazz = conf.getClassByName(name);
cacheIdPlugin =
(TimelineEntityGroupPlugin) ReflectionUtils.newInstance(
clazz, conf);
} catch (Exception e) {
LOG.warn("Error loading plugin " + name, e);
}
if (cacheIdPlugin == null) {
throw new RuntimeException("No class defined for " + name);
}
LOG.info("Load plugin class {}", cacheIdPlugin.getClass().getName());
pluginList.add(cacheIdPlugin);
}
return pluginList;
}
private TimelineStore createSummaryStore() {
return ReflectionUtils.newInstance(getConfig().getClass(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_STORE,
LeveldbTimelineStore.class, TimelineStore.class), getConfig());
}
@Override
protected void serviceStart() throws Exception {
LOG.info("Starting {}", getName());
yarnClient.start();
summaryStore.start();
Configuration conf = getConfig();
aclManager = new TimelineACLsManager(conf);
aclManager.setTimelineStore(summaryStore);
summaryTdm = new TimelineDataManager(summaryStore, aclManager);
summaryTdm.init(conf);
summaryTdm.start();
activeRootPath = new Path(conf.get(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
doneRootPath = new Path(conf.get(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT));
fs = activeRootPath.getFileSystem(conf);
if (!fs.exists(activeRootPath)) {
fs.mkdirs(activeRootPath);
fs.setPermission(activeRootPath, ACTIVE_DIR_PERMISSION);
}
if (!fs.exists(doneRootPath)) {
fs.mkdirs(doneRootPath);
fs.setPermission(doneRootPath, DONE_DIR_PERMISSION);
}
objMapper = new ObjectMapper();
objMapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector());
jsonFactory = new MappingJsonFactory(objMapper);
final long scanIntervalSecs = conf.getLong(
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT
);
final long cleanerIntervalSecs = conf.getLong(
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS_DEFAULT
);
final int numThreads = conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT);
LOG.info("Scanning active directory every {} seconds", scanIntervalSecs);
LOG.info("Cleaning logs every {} seconds", cleanerIntervalSecs);
executor = new ScheduledThreadPoolExecutor(numThreads,
new ThreadFactoryBuilder().setNameFormat("EntityLogPluginWorker #%d")
.build());
executor.scheduleAtFixedRate(new EntityLogScanner(), 0, scanIntervalSecs,
TimeUnit.SECONDS);
executor.scheduleAtFixedRate(new EntityLogCleaner(), cleanerIntervalSecs,
cleanerIntervalSecs, TimeUnit.SECONDS);
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
LOG.info("Stopping {}", getName());
if (executor != null) {
executor.shutdown();
if (executor.isTerminating()) {
LOG.info("Waiting for executor to terminate");
boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
if (terminated) {
LOG.info("Executor terminated");
} else {
LOG.warn("Executor did not terminate");
executor.shutdownNow();
}
}
}
if (summaryTdm != null) {
summaryTdm.stop();
}
if (summaryStore != null) {
summaryStore.stop();
}
if (yarnClient != null) {
yarnClient.stop();
}
synchronized (cachedLogs) {
for (EntityCacheItem cacheItem : cachedLogs.values()) {
cacheItem.getStore().close();
}
}
super.serviceStop();
}
@InterfaceAudience.Private
@VisibleForTesting
void scanActiveLogs() throws IOException {
RemoteIterator<FileStatus> iter = fs.listStatusIterator(activeRootPath);
while (iter.hasNext()) {
FileStatus stat = iter.next();
ApplicationId appId = parseApplicationId(stat.getPath().getName());
if (appId != null) {
LOG.debug("scan logs for {} in {}", appId, stat.getPath());
AppLogs logs = getAndSetActiveLog(appId, stat.getPath());
executor.execute(new ActiveLogParser(logs));
}
}
}
private AppLogs createAndPutAppLogsIfAbsent(ApplicationId appId,
Path appDirPath, AppState appState) {
AppLogs appLogs = new AppLogs(appId, appDirPath, appState);
AppLogs oldAppLogs = appIdLogMap.putIfAbsent(appId, appLogs);
if (oldAppLogs != null) {
appLogs = oldAppLogs;
}
return appLogs;
}
private AppLogs getAndSetActiveLog(ApplicationId appId, Path appDirPath) {
AppLogs appLogs = appIdLogMap.get(appId);
if (appLogs == null) {
appLogs = createAndPutAppLogsIfAbsent(appId, appDirPath, AppState.ACTIVE);
}
return appLogs;
}
// searches for the app logs and returns it if found else null
private AppLogs getAndSetAppLogs(ApplicationId applicationId)
throws IOException {
LOG.debug("Looking for app logs mapped for app id {}", applicationId);
AppLogs appLogs = appIdLogMap.get(applicationId);
if (appLogs == null) {
AppState appState = AppState.UNKNOWN;
Path appDirPath = getDoneAppPath(applicationId);
if (fs.exists(appDirPath)) {
appState = AppState.COMPLETED;
} else {
appDirPath = getActiveAppPath(applicationId);
if (fs.exists(appDirPath)) {
appState = AppState.ACTIVE;
}
}
if (appState != AppState.UNKNOWN) {
LOG.debug("Create and try to add new appLogs to appIdLogMap for {}",
applicationId);
appLogs = createAndPutAppLogsIfAbsent(
applicationId, appDirPath, appState);
}
}
return appLogs;
}
/**
* Main function for entity log cleaner. This method performs depth first
* search from a given dir path for all application log dirs. Once found, it
* will decide if the directory should be cleaned up and then clean them.
*
* @param dirpath the root directory the cleaner should start with. Note that
* dirpath should be a directory that contains a set of
* application log directories. The cleaner method will not
* work if the given dirpath itself is an application log dir.
* @param fs
* @param retainMillis
* @throws IOException
*/
@InterfaceAudience.Private
@VisibleForTesting
static void cleanLogs(Path dirpath, FileSystem fs, long retainMillis)
throws IOException {
long now = Time.now();
// Depth first search from root directory for all application log dirs
RemoteIterator<FileStatus> iter = fs.listStatusIterator(dirpath);
while (iter.hasNext()) {
FileStatus stat = iter.next();
if (stat.isDirectory()) {
// If current is an application log dir, decide if we need to remove it
// and remove if necessary.
// Otherwise, keep iterating into it.
ApplicationId appId = parseApplicationId(dirpath.getName());
if (appId != null) { // Application log dir
if (shouldCleanAppLogDir(dirpath, now, fs, retainMillis)) {
try {
LOG.info("Deleting {}", dirpath);
if (!fs.delete(dirpath, true)) {
LOG.error("Unable to remove " + dirpath);
}
} catch (IOException e) {
LOG.error("Unable to remove " + dirpath, e);
}
}
} else { // Keep cleaning inside
cleanLogs(stat.getPath(), fs, retainMillis);
}
}
}
}
private static boolean shouldCleanAppLogDir(Path appLogPath, long now,
FileSystem fs, long logRetainMillis) throws IOException {
RemoteIterator<FileStatus> iter = fs.listStatusIterator(appLogPath);
while (iter.hasNext()) {
FileStatus stat = iter.next();
if (now - stat.getModificationTime() <= logRetainMillis) {
// found a dir entry that is fresh enough to prevent
// cleaning this directory.
LOG.debug("{} not being cleaned due to {}", appLogPath, stat.getPath());
return false;
}
// Otherwise, keep searching files inside for directories.
if (stat.isDirectory()) {
if (!shouldCleanAppLogDir(stat.getPath(), now, fs, logRetainMillis)) {
return false;
}
}
}
return true;
}
// converts the String to an ApplicationId or null if conversion failed
private static ApplicationId parseApplicationId(String appIdStr) {
ApplicationId appId = null;
if (appIdStr.startsWith(ApplicationId.appIdStrPrefix)) {
try {
appId = ConverterUtils.toApplicationId(appIdStr);
} catch (IllegalArgumentException e) {
appId = null;
}
}
return appId;
}
private Path getActiveAppPath(ApplicationId appId) {
return new Path(activeRootPath, appId.toString());
}
private Path getDoneAppPath(ApplicationId appId) {
// cut up the app ID into mod(1000) buckets
int appNum = appId.getId();
appNum /= 1000;
int bucket2 = appNum % 1000;
int bucket1 = appNum / 1000;
return new Path(doneRootPath,
String.format(APP_DONE_DIR_PREFIX_FORMAT, appId.getClusterTimestamp(),
bucket1, bucket2, appId.toString()));
}
// This method has to be synchronized to control traffic to RM
private static synchronized AppState getAppState(ApplicationId appId,
YarnClient yarnClient) throws IOException {
AppState appState = AppState.ACTIVE;
try {
ApplicationReport report = yarnClient.getApplicationReport(appId);
YarnApplicationState yarnState = report.getYarnApplicationState();
if (APP_FINAL_STATES.contains(yarnState)) {
appState = AppState.COMPLETED;
}
} catch (ApplicationNotFoundException e) {
appState = AppState.UNKNOWN;
} catch (YarnException e) {
throw new IOException(e);
}
return appState;
}
@InterfaceAudience.Private
@VisibleForTesting
enum AppState {
ACTIVE,
UNKNOWN,
COMPLETED
}
class AppLogs {
private ApplicationId appId;
private Path appDirPath;
private AppState appState;
private List<LogInfo> summaryLogs = new ArrayList<LogInfo>();
private List<LogInfo> detailLogs = new ArrayList<LogInfo>();
public AppLogs(ApplicationId appId, Path appPath, AppState state) {
this.appId = appId;
appDirPath = appPath;
appState = state;
}
public synchronized boolean isDone() {
return appState == AppState.COMPLETED;
}
public synchronized ApplicationId getAppId() {
return appId;
}
public synchronized Path getAppDirPath() {
return appDirPath;
}
synchronized List<LogInfo> getSummaryLogs() {
return summaryLogs;
}
synchronized List<LogInfo> getDetailLogs() {
return detailLogs;
}
public synchronized void parseSummaryLogs() throws IOException {
parseSummaryLogs(summaryTdm);
}
@InterfaceAudience.Private
@VisibleForTesting
synchronized void parseSummaryLogs(TimelineDataManager tdm)
throws IOException {
if (!isDone()) {
LOG.debug("Try to parse summary log for log {} in {}",
appId, appDirPath);
appState = EntityGroupFSTimelineStore.getAppState(appId, yarnClient);
long recentLogModTime = scanForLogs();
if (appState == AppState.UNKNOWN) {
if (Time.now() - recentLogModTime > unknownActiveMillis) {
LOG.info(
"{} state is UNKNOWN and logs are stale, assuming COMPLETED",
appId);
appState = AppState.COMPLETED;
}
}
}
List<LogInfo> removeList = new ArrayList<LogInfo>();
for (LogInfo log : summaryLogs) {
if (fs.exists(log.getPath(appDirPath))) {
log.parseForStore(tdm, appDirPath, isDone(), jsonFactory,
objMapper, fs);
} else {
// The log may have been removed, remove the log
removeList.add(log);
LOG.info("File {} no longer exists, remove it from log list",
log.getPath(appDirPath));
}
}
summaryLogs.removeAll(removeList);
}
// scans for new logs and returns the modification timestamp of the
// most recently modified log
@InterfaceAudience.Private
@VisibleForTesting
long scanForLogs() throws IOException {
LOG.debug("scanForLogs on {}", appDirPath);
long newestModTime = 0;
RemoteIterator<FileStatus> iterAttempt =
fs.listStatusIterator(appDirPath);
while (iterAttempt.hasNext()) {
FileStatus statAttempt = iterAttempt.next();
LOG.debug("scanForLogs on {}", statAttempt.getPath().getName());
if (!statAttempt.isDirectory()
|| !statAttempt.getPath().getName()
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
LOG.debug("Scanner skips for unknown dir/file {}",
statAttempt.getPath());
continue;
}
String attemptDirName = statAttempt.getPath().getName();
RemoteIterator<FileStatus> iterCache
= fs.listStatusIterator(statAttempt.getPath());
while (iterCache.hasNext()) {
FileStatus statCache = iterCache.next();
if (!statCache.isFile()) {
continue;
}
String filename = statCache.getPath().getName();
// We should only update time for log files.
boolean shouldSetTime = true;
LOG.debug("scan for log file: {}", filename);
if (filename.startsWith(DOMAIN_LOG_PREFIX)) {
addSummaryLog(attemptDirName, filename, statCache.getOwner(), true);
} else if (filename.startsWith(SUMMARY_LOG_PREFIX)) {
addSummaryLog(attemptDirName, filename, statCache.getOwner(),
false);
} else if (filename.startsWith(ENTITY_LOG_PREFIX)) {
addDetailLog(attemptDirName, filename, statCache.getOwner());
} else {
shouldSetTime = false;
}
if (shouldSetTime) {
newestModTime
= Math.max(statCache.getModificationTime(), newestModTime);
}
}
}
// if there are no logs in the directory then use the modification
// time of the directory itself
if (newestModTime == 0) {
newestModTime = fs.getFileStatus(appDirPath).getModificationTime();
}
return newestModTime;
}
private void addSummaryLog(String attemptDirName,
String filename, String owner, boolean isDomainLog) {
for (LogInfo log : summaryLogs) {
if (log.getFilename().equals(filename)
&& log.getAttemptDirName().equals(attemptDirName)) {
return;
}
}
LOG.debug("Incoming log {} not present in my summaryLogs list, add it",
filename);
LogInfo log;
if (isDomainLog) {
log = new DomainLogInfo(attemptDirName, filename, owner);
} else {
log = new EntityLogInfo(attemptDirName, filename, owner);
}
summaryLogs.add(log);
}
private void addDetailLog(String attemptDirName, String filename,
String owner) {
for (LogInfo log : detailLogs) {
if (log.getFilename().equals(filename)
&& log.getAttemptDirName().equals(attemptDirName)) {
return;
}
}
detailLogs.add(new EntityLogInfo(attemptDirName, filename, owner));
}
public synchronized void moveToDone() throws IOException {
Path doneAppPath = getDoneAppPath(appId);
if (!doneAppPath.equals(appDirPath)) {
Path donePathParent = doneAppPath.getParent();
if (!fs.exists(donePathParent)) {
fs.mkdirs(donePathParent);
}
LOG.debug("Application {} is done, trying to move to done dir {}",
appId, doneAppPath);
if (!fs.rename(appDirPath, doneAppPath)) {
throw new IOException("Rename " + appDirPath + " to " + doneAppPath
+ " failed");
} else {
LOG.info("Moved {} to {}", appDirPath, doneAppPath);
}
appDirPath = doneAppPath;
}
}
}
private class EntityLogScanner implements Runnable {
@Override
public void run() {
LOG.debug("Active scan starting");
try {
scanActiveLogs();
} catch (Exception e) {
LOG.error("Error scanning active files", e);
}
LOG.debug("Active scan complete");
}
}
private class ActiveLogParser implements Runnable {
private AppLogs appLogs;
public ActiveLogParser(AppLogs logs) {
appLogs = logs;
}
@Override
public void run() {
try {
LOG.debug("Begin parsing summary logs. ");
appLogs.parseSummaryLogs();
if (appLogs.isDone()) {
appLogs.moveToDone();
appIdLogMap.remove(appLogs.getAppId());
}
LOG.debug("End parsing summary logs. ");
} catch (Exception e) {
LOG.error("Error processing logs for " + appLogs.getAppId(), e);
}
}
}
private class EntityLogCleaner implements Runnable {
@Override
public void run() {
LOG.debug("Cleaner starting");
try {
cleanLogs(doneRootPath, fs, logRetainMillis);
} catch (Exception e) {
LOG.error("Error cleaning files", e);
}
LOG.debug("Cleaner finished");
}
}
@InterfaceAudience.Private
@VisibleForTesting
void setFs(FileSystem incomingFs) {
this.fs = incomingFs;
}
@InterfaceAudience.Private
@VisibleForTesting
void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) {
cachedLogs.put(groupId, cacheItem);
}
private List<TimelineStore> getTimelineStoresFromCacheIds(
Set<TimelineEntityGroupId> groupIds, String entityType)
throws IOException {
List<TimelineStore> stores = new LinkedList<TimelineStore>();
// For now we just handle one store in a context. We return the first
// non-null storage for the group ids.
for (TimelineEntityGroupId groupId : groupIds) {
TimelineStore storeForId = getCachedStore(groupId);
if (storeForId != null) {
LOG.debug("Adding {} as a store for the query", storeForId.getName());
stores.add(storeForId);
}
}
if (stores.size() == 0) {
LOG.debug("Using summary store for {}", entityType);
stores.add(this.summaryStore);
}
return stores;
}
private List<TimelineStore> getTimelineStoresForRead(String entityId,
String entityType) throws IOException {
Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
LOG.debug("Trying plugin {} for id {} and type {}",
cacheIdPlugin.getClass().getName(), entityId, entityType);
Set<TimelineEntityGroupId> idsFromPlugin
= cacheIdPlugin.getTimelineEntityGroupId(entityId, entityType);
if (idsFromPlugin == null) {
LOG.debug("Plugin returned null " + cacheIdPlugin.getClass().getName());
} else {
LOG.debug("Plugin returned ids: " + idsFromPlugin);
}
if (idsFromPlugin != null) {
groupIds.addAll(idsFromPlugin);
LOG.debug("plugin {} returns a non-null value on query",
cacheIdPlugin.getClass().getName());
}
}
return getTimelineStoresFromCacheIds(groupIds, entityType);
}
private List<TimelineStore> getTimelineStoresForRead(String entityType,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters)
throws IOException {
Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
Set<TimelineEntityGroupId> idsFromPlugin =
cacheIdPlugin.getTimelineEntityGroupId(entityType, primaryFilter,
secondaryFilters);
if (idsFromPlugin != null) {
LOG.debug("plugin {} returns a non-null value on query {}",
cacheIdPlugin.getClass().getName(), idsFromPlugin);
groupIds.addAll(idsFromPlugin);
}
}
return getTimelineStoresFromCacheIds(groupIds, entityType);
}
// find a cached timeline store or null if it cannot be located
private TimelineStore getCachedStore(TimelineEntityGroupId groupId)
throws IOException {
EntityCacheItem cacheItem;
synchronized (this.cachedLogs) {
// Note that the content in the cache log storage may be stale.
cacheItem = this.cachedLogs.get(groupId);
if (cacheItem == null) {
LOG.debug("Set up new cache item for id {}", groupId);
cacheItem = new EntityCacheItem(getConfig(), fs);
AppLogs appLogs = getAndSetAppLogs(groupId.getApplicationId());
if (appLogs != null) {
LOG.debug("Set applogs {} for group id {}", appLogs, groupId);
cacheItem.setAppLogs(appLogs);
this.cachedLogs.put(groupId, cacheItem);
} else {
LOG.warn("AppLogs for groupId {} is set to null!", groupId);
}
}
}
TimelineStore store = null;
if (cacheItem.getAppLogs() != null) {
AppLogs appLogs = cacheItem.getAppLogs();
LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId());
store = cacheItem.refreshCache(groupId, aclManager, jsonFactory,
objMapper);
} else {
LOG.warn("AppLogs for group id {} is null", groupId);
}
return store;
}
@Override
public TimelineEntities getEntities(String entityType, Long limit,
Long windowStart, Long windowEnd, String fromId, Long fromTs,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fieldsToRetrieve, CheckAcl checkAcl) throws IOException {
LOG.debug("getEntities type={} primary={}", entityType, primaryFilter);
List<TimelineStore> stores = getTimelineStoresForRead(entityType,
primaryFilter, secondaryFilters);
TimelineEntities returnEntities = new TimelineEntities();
for (TimelineStore store : stores) {
LOG.debug("Try timeline store {} for the request", store.getName());
returnEntities.addEntities(
store.getEntities(entityType, limit, windowStart, windowEnd, fromId,
fromTs, primaryFilter, secondaryFilters, fieldsToRetrieve,
checkAcl).getEntities());
}
return returnEntities;
}
@Override
public TimelineEntity getEntity(String entityId, String entityType,
EnumSet<Field> fieldsToRetrieve) throws IOException {
LOG.debug("getEntity type={} id={}", entityType, entityId);
List<TimelineStore> stores = getTimelineStoresForRead(entityId, entityType);
for (TimelineStore store : stores) {
LOG.debug("Try timeline store {}:{} for the request", store.getName(),
store.toString());
TimelineEntity e =
store.getEntity(entityId, entityType, fieldsToRetrieve);
if (e != null) {
return e;
}
}
LOG.debug("getEntity: Found nothing");
return null;
}
@Override
public TimelineEvents getEntityTimelines(String entityType,
SortedSet<String> entityIds, Long limit, Long windowStart,
Long windowEnd, Set<String> eventTypes) throws IOException {
LOG.debug("getEntityTimelines type={} ids={}", entityType, entityIds);
TimelineEvents returnEvents = new TimelineEvents();
for (String entityId : entityIds) {
LOG.debug("getEntityTimeline type={} id={}", entityType, entityId);
List<TimelineStore> stores
= getTimelineStoresForRead(entityId, entityType);
for (TimelineStore store : stores) {
LOG.debug("Try timeline store {}:{} for the request", store.getName(),
store.toString());
SortedSet<String> entityIdSet = new TreeSet<>();
entityIdSet.add(entityId);
TimelineEvents events =
store.getEntityTimelines(entityType, entityIdSet, limit,
windowStart, windowEnd, eventTypes);
returnEvents.addEvents(events.getAllEvents());
}
}
return returnEvents;
}
@Override
public TimelineDomain getDomain(String domainId) throws IOException {
return summaryStore.getDomain(domainId);
}
@Override
public TimelineDomains getDomains(String owner) throws IOException {
return summaryStore.getDomains(owner);
}
@Override
public TimelinePutResponse put(TimelineEntities data) throws IOException {
return summaryStore.put(data);
}
@Override
public void put(TimelineDomain domain) throws IOException {
summaryStore.put(domain);
}
}

View File

@ -0,0 +1,281 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.MappingIterator;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
abstract class LogInfo {
public static final String ENTITY_FILE_NAME_DELIMITERS = "_.";
public String getAttemptDirName() {
return attemptDirName;
}
public long getOffset() {
return offset;
}
public void setOffset(long newOffset) {
this.offset = newOffset;
}
private String attemptDirName;
private String filename;
private String user;
private long offset = 0;
private static final Logger LOG = LoggerFactory.getLogger(LogInfo.class);
public LogInfo(String attemptDirName, String file, String owner) {
this.attemptDirName = attemptDirName;
filename = file;
user = owner;
}
public Path getPath(Path rootPath) {
Path attemptPath = new Path(rootPath, attemptDirName);
return new Path(attemptPath, filename);
}
public String getFilename() {
return filename;
}
public boolean matchesGroupId(TimelineEntityGroupId groupId) {
return matchesGroupId(groupId.toString());
}
@InterfaceAudience.Private
@VisibleForTesting
boolean matchesGroupId(String groupId){
// Return true if the group id is a segment (separated by _, ., or end of
// string) of the file name.
int pos = filename.indexOf(groupId);
if (pos < 0) {
return false;
}
return filename.length() == pos + groupId.length()
|| ENTITY_FILE_NAME_DELIMITERS.contains(String.valueOf(
filename.charAt(pos + groupId.length())
));
}
public void parseForStore(TimelineDataManager tdm, Path appDirPath,
boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper,
FileSystem fs) throws IOException {
LOG.debug("Parsing for log dir {} on attempt {}", appDirPath,
attemptDirName);
Path logPath = getPath(appDirPath);
if (fs.exists(logPath)) {
long startTime = Time.monotonicNow();
try {
LOG.debug("Parsing {} at offset {}", logPath, offset);
long count = parsePath(tdm, logPath, appCompleted, jsonFactory,
objMapper, fs);
LOG.info("Parsed {} entities from {} in {} msec",
count, logPath, Time.monotonicNow() - startTime);
} catch (RuntimeException e) {
if (e.getCause() instanceof JsonParseException) {
// If AppLogs cannot parse this log, it may be corrupted
LOG.info("Log {} appears to be corrupted. Skip. ", logPath);
}
}
} else {
LOG.warn("{} no longer exists. Skip for scanning. ", logPath);
}
}
private long parsePath(TimelineDataManager tdm, Path logPath,
boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper,
FileSystem fs) throws IOException {
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(user);
FSDataInputStream in = fs.open(logPath);
JsonParser parser = null;
try {
in.seek(offset);
try {
parser = jsonFactory.createJsonParser(in);
parser.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
} catch (IOException e) {
// if app hasn't completed then there may be errors due to the
// incomplete file which are treated as EOF until app completes
if (appCompleted) {
throw e;
} else {
LOG.debug("Exception in parse path: {}", e.getMessage());
return 0;
}
}
return doParse(tdm, parser, objMapper, ugi, appCompleted);
} finally {
IOUtils.closeStream(parser);
IOUtils.closeStream(in);
}
}
protected abstract long doParse(TimelineDataManager tdm, JsonParser parser,
ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
throws IOException;
}
class EntityLogInfo extends LogInfo {
private static final Logger LOG = LoggerFactory.getLogger(
EntityGroupFSTimelineStore.class);
public EntityLogInfo(String attemptId,
String file, String owner) {
super(attemptId, file, owner);
}
@Override
protected long doParse(TimelineDataManager tdm, JsonParser parser,
ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
throws IOException {
long count = 0;
TimelineEntities entities = new TimelineEntities();
ArrayList<TimelineEntity> entityList = new ArrayList<TimelineEntity>(1);
long bytesParsed;
long bytesParsedLastBatch = 0;
boolean postError = false;
try {
MappingIterator<TimelineEntity> iter = objMapper.readValues(parser,
TimelineEntity.class);
while (iter.hasNext()) {
TimelineEntity entity = iter.next();
String etype = entity.getEntityType();
String eid = entity.getEntityId();
LOG.trace("Read entity {}", etype);
++count;
bytesParsed = parser.getCurrentLocation().getCharOffset() + 1;
LOG.trace("Parser now at offset {}", bytesParsed);
try {
LOG.debug("Adding {}({}) to store", eid, etype);
entityList.add(entity);
entities.setEntities(entityList);
TimelinePutResponse response = tdm.postEntities(entities, ugi);
for (TimelinePutResponse.TimelinePutError e
: response.getErrors()) {
LOG.warn("Error putting entity: {} ({}): {}",
e.getEntityId(), e.getEntityType(), e.getErrorCode());
}
setOffset(getOffset() + bytesParsed - bytesParsedLastBatch);
bytesParsedLastBatch = bytesParsed;
entityList.clear();
} catch (YarnException e) {
postError = true;
throw new IOException("Error posting entities", e);
} catch (IOException e) {
postError = true;
throw new IOException("Error posting entities", e);
}
}
} catch (IOException e) {
// if app hasn't completed then there may be errors due to the
// incomplete file which are treated as EOF until app completes
if (appCompleted || postError) {
throw e;
}
} catch (RuntimeException e) {
if (appCompleted || !(e.getCause() instanceof JsonParseException)) {
throw e;
}
}
return count;
}
}
class DomainLogInfo extends LogInfo {
private static final Logger LOG = LoggerFactory.getLogger(
EntityGroupFSTimelineStore.class);
public DomainLogInfo(String attemptDirName, String file,
String owner) {
super(attemptDirName, file, owner);
}
protected long doParse(TimelineDataManager tdm, JsonParser parser,
ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
throws IOException {
long count = 0;
long bytesParsed;
long bytesParsedLastBatch = 0;
boolean putError = false;
try {
MappingIterator<TimelineDomain> iter = objMapper.readValues(parser,
TimelineDomain.class);
while (iter.hasNext()) {
TimelineDomain domain = iter.next();
domain.setOwner(ugi.getShortUserName());
LOG.trace("Read domain {}", domain.getId());
++count;
bytesParsed = parser.getCurrentLocation().getCharOffset() + 1;
LOG.trace("Parser now at offset {}", bytesParsed);
try {
tdm.putDomain(domain, ugi);
setOffset(getOffset() + bytesParsed - bytesParsedLastBatch);
bytesParsedLastBatch = bytesParsed;
} catch (YarnException e) {
putError = true;
throw new IOException("Error posting domain", e);
} catch (IOException e) {
putError = true;
throw new IOException("Error posting domain", e);
}
}
} catch (IOException e) {
// if app hasn't completed then there may be errors due to the
// incomplete file which are treated as EOF until app completes
if (appCompleted || putError) {
throw e;
}
} catch (RuntimeException e) {
if (appCompleted || !(e.getCause() instanceof JsonParseException)) {
throw e;
}
}
return count;
}
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import java.util.Collection;
import java.util.Set;
import java.util.SortedSet;
/**
* Plugin to map a requested query ( or an Entity/set of Entities ) to a CacheID.
* The Cache ID is an identifier to the data set that needs to be queried to
* serve the response for the query.
*/
public abstract class TimelineEntityGroupPlugin {
/**
* Get the {@link TimelineEntityGroupId}s for the data sets that need to be
* scanned to serve the query.
*
* @param entityType Entity Type being queried
* @param primaryFilter Primary filter being applied
* @param secondaryFilters Secondary filters being applied in the query
* @return {@link org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId}
*/
public abstract Set<TimelineEntityGroupId> getTimelineEntityGroupId(
String entityType, NameValuePair primaryFilter,
Collection<NameValuePair> secondaryFilters);
/**
* Get the {@link TimelineEntityGroupId}s for the data sets that need to be
* scanned to serve the query.
*
* @param entityType Entity Type being queried
* @param entityId Entity Id being requested
* @return {@link org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId}
*/
public abstract Set<TimelineEntityGroupId> getTimelineEntityGroupId(
String entityId,
String entityType);
/**
* Get the {@link TimelineEntityGroupId}s for the data sets that need to be
* scanned to serve the query.
*
* @param entityType Entity Type being queried
* @param entityIds Entity Ids being requested
* @param eventTypes Event Types being requested
* @return {@link org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId}
*/
public abstract Set<TimelineEntityGroupId> getTimelineEntityGroupId(
String entityType, SortedSet<String> entityIds,
Set<String> eventTypes);
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.timeline;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import com.google.common.collect.Sets;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import java.util.Collection;
import java.util.Set;
import java.util.SortedSet;
class EntityGroupPlugInForTest extends TimelineEntityGroupPlugin {
private static TimelineEntityGroupId timelineEntityGroupId
= TimelineEntityGroupId.newInstance(
TestEntityGroupFSTimelineStore.TEST_APPLICATION_ID, "test");
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
NameValuePair primaryFilter,
Collection<NameValuePair> secondaryFilters) {
return Sets.newHashSet(timelineEntityGroupId);
}
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityId,
String entityType) {
return Sets.newHashSet(timelineEntityGroupId);
}
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
SortedSet<String> entityIds,
Set<String> eventTypes) {
return Sets.newHashSet(timelineEntityGroupId);
}
static TimelineEntityGroupId getStandardTimelineGroupId() {
return timelineEntityGroupId;
}
}

View File

@ -0,0 +1,208 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
class PluginStoreTestUtils {
static FSDataOutputStream createLogFile(Path logPath, FileSystem fs)
throws IOException {
FSDataOutputStream stream;
stream = fs.create(logPath, true);
return stream;
}
static ObjectMapper createObjectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector());
mapper.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
mapper.configure(SerializationConfig.Feature.CLOSE_CLOSEABLE, false);
return mapper;
}
/**
* Create sample entities for testing
* @return two timeline entities in a {@link TimelineEntities} object
*/
static TimelineEntities generateTestEntities() {
TimelineEntities entities = new TimelineEntities();
Map<String, Set<Object>> primaryFilters =
new HashMap<String, Set<Object>>();
Set<Object> l1 = new HashSet<Object>();
l1.add("username");
Set<Object> l2 = new HashSet<Object>();
l2.add(Integer.MAX_VALUE);
Set<Object> l3 = new HashSet<Object>();
l3.add("123abc");
Set<Object> l4 = new HashSet<Object>();
l4.add((long)Integer.MAX_VALUE + 1l);
primaryFilters.put("user", l1);
primaryFilters.put("appname", l2);
primaryFilters.put("other", l3);
primaryFilters.put("long", l4);
Map<String, Object> secondaryFilters = new HashMap<String, Object>();
secondaryFilters.put("startTime", 123456);
secondaryFilters.put("status", "RUNNING");
Map<String, Object> otherInfo1 = new HashMap<String, Object>();
otherInfo1.put("info1", "val1");
otherInfo1.putAll(secondaryFilters);
String entityId1 = "id_1";
String entityType1 = "type_1";
String entityId2 = "id_2";
String entityType2 = "type_2";
Map<String, Set<String>> relatedEntities =
new HashMap<String, Set<String>>();
relatedEntities.put(entityType2, Collections.singleton(entityId2));
TimelineEvent ev3 = createEvent(789l, "launch_event", null);
TimelineEvent ev4 = createEvent(0l, "init_event", null);
List<TimelineEvent> events = new ArrayList<TimelineEvent>();
events.add(ev3);
events.add(ev4);
entities.addEntity(createEntity(entityId2, entityType2, 456l, events, null,
null, null, "domain_id_1"));
TimelineEvent ev1 = createEvent(123l, "start_event", null);
entities.addEntity(createEntity(entityId1, entityType1, 123l,
Collections.singletonList(ev1), relatedEntities, primaryFilters,
otherInfo1, "domain_id_1"));
return entities;
}
static void verifyTestEntities(TimelineDataManager tdm)
throws YarnException, IOException {
TimelineEntity entity1 = tdm.getEntity("type_1", "id_1",
EnumSet.allOf(TimelineReader.Field.class),
UserGroupInformation.getLoginUser());
TimelineEntity entity2 = tdm.getEntity("type_2", "id_2",
EnumSet.allOf(TimelineReader.Field.class),
UserGroupInformation.getLoginUser());
assertNotNull(entity1);
assertNotNull(entity2);
assertEquals("Failed to read out entity 1",
(Long) 123l, entity1.getStartTime());
assertEquals("Failed to read out entity 2",
(Long) 456l, entity2.getStartTime());
}
/**
* Create a test entity
*/
static TimelineEntity createEntity(String entityId, String entityType,
Long startTime, List<TimelineEvent> events,
Map<String, Set<String>> relatedEntities,
Map<String, Set<Object>> primaryFilters,
Map<String, Object> otherInfo, String domainId) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityId(entityId);
entity.setEntityType(entityType);
entity.setStartTime(startTime);
entity.setEvents(events);
if (relatedEntities != null) {
for (Map.Entry<String, Set<String>> e : relatedEntities.entrySet()) {
for (String v : e.getValue()) {
entity.addRelatedEntity(e.getKey(), v);
}
}
} else {
entity.setRelatedEntities(null);
}
entity.setPrimaryFilters(primaryFilters);
entity.setOtherInfo(otherInfo);
entity.setDomainId(domainId);
return entity;
}
/**
* Create a test event
*/
static TimelineEvent createEvent(long timestamp, String type, Map<String,
Object> info) {
TimelineEvent event = new TimelineEvent();
event.setTimestamp(timestamp);
event.setEventType(type);
event.setEventInfo(info);
return event;
}
/**
* Write timeline entities to a file system
* @param entities
* @param logPath
* @param fs
* @throws IOException
*/
static void writeEntities(TimelineEntities entities, Path logPath,
FileSystem fs) throws IOException {
FSDataOutputStream outStream = createLogFile(logPath, fs);
JsonGenerator jsonGenerator
= (new JsonFactory()).createJsonGenerator(outStream);
jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
ObjectMapper objMapper = createObjectMapper();
for (TimelineEntity entity : entities.getEntities()) {
objMapper.writeValue(jsonGenerator, entity);
}
outStream.close();
}
static TimelineDataManager getTdmWithStore(Configuration config, TimelineStore store) {
TimelineACLsManager aclManager = new TimelineACLsManager(config);
TimelineDataManager tdm = new TimelineDataManager(store, aclManager);
tdm.init(config);
return tdm;
}
static TimelineDataManager getTdmWithMemStore(Configuration config) {
TimelineStore store = new MemoryTimelineStore();
TimelineDataManager tdm = getTdmWithStore(config, store);
return tdm;
}
}

View File

@ -0,0 +1,332 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
private static final String SAMPLE_APP_NAME = "1234_5678";
static final ApplicationId TEST_APPLICATION_ID
= ConverterUtils.toApplicationId(
ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_NAME);
private static final String TEST_APP_DIR_NAME
= TEST_APPLICATION_ID.toString();
private static final String TEST_ATTEMPT_DIR_NAME
= ApplicationAttemptId.appAttemptIdStrPrefix + SAMPLE_APP_NAME + "_1";
private static final String TEST_SUMMARY_LOG_FILE_NAME
= EntityGroupFSTimelineStore.SUMMARY_LOG_PREFIX + "test";
private static final String TEST_ENTITY_LOG_FILE_NAME
= EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+ EntityGroupPlugInForTest.getStandardTimelineGroupId();
private static final String TEST_DOMAIN_LOG_FILE_NAME
= EntityGroupFSTimelineStore.DOMAIN_LOG_PREFIX + "test";
private static final Path TEST_ROOT_DIR
= new Path(System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestEntityGroupFSTimelineStore.class.getSimpleName());
private static final Path TEST_APP_DIR_PATH
= new Path(TEST_ROOT_DIR, TEST_APP_DIR_NAME);
private static final Path TEST_ATTEMPT_DIR_PATH
= new Path(TEST_APP_DIR_PATH, TEST_ATTEMPT_DIR_NAME);
private static final Path TEST_DONE_DIR_PATH
= new Path(TEST_ROOT_DIR, "done");
private static Configuration config = new YarnConfiguration();
private static MiniDFSCluster hdfsCluster;
private static FileSystem fs;
private EntityGroupFSTimelineStore store;
private TimelineEntity entityNew;
@Rule
public TestName currTestName = new TestName();
@BeforeClass
public static void setupClass() throws Exception {
config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
config.set(
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
"YARN_APPLICATION,YARN_APPLICATION_ATTEMPT,YARN_CONTAINER");
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
TEST_DONE_DIR_PATH.toString());
config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString());
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
hdfsCluster
= new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build();
fs = hdfsCluster.getFileSystem();
}
@Before
public void setup() throws Exception {
createTestFiles();
store = new EntityGroupFSTimelineStore();
if (currTestName.getMethodName().contains("Plugin")) {
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
EntityGroupPlugInForTest.class.getName());
}
store.init(config);
store.start();
store.setFs(fs);
}
@After
public void tearDown() throws Exception {
fs.delete(TEST_APP_DIR_PATH, true);
store.stop();
}
@AfterClass
public static void tearDownClass() throws Exception {
hdfsCluster.shutdown();
FileContext fileContext = FileContext.getLocalFSFileContext();
fileContext.delete(new Path(
config.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)), true);
}
@Test
public void testAppLogsScanLogs() throws Exception {
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
AppState.COMPLETED);
appLogs.scanForLogs();
List<LogInfo> summaryLogs = appLogs.getSummaryLogs();
List<LogInfo> detailLogs = appLogs.getDetailLogs();
assertEquals(2, summaryLogs.size());
assertEquals(1, detailLogs.size());
for (LogInfo log : summaryLogs) {
String fileName = log.getFilename();
assertTrue(fileName.equals(TEST_SUMMARY_LOG_FILE_NAME)
|| fileName.equals(TEST_DOMAIN_LOG_FILE_NAME));
}
for (LogInfo log : detailLogs) {
String fileName = log.getFilename();
assertEquals(fileName, TEST_ENTITY_LOG_FILE_NAME);
}
}
@Test
public void testMoveToDone() throws Exception {
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
AppState.COMPLETED);
Path pathBefore = appLogs.getAppDirPath();
appLogs.moveToDone();
Path pathAfter = appLogs.getAppDirPath();
assertNotEquals(pathBefore, pathAfter);
assertTrue(pathAfter.toString().contains(TEST_DONE_DIR_PATH.toString()));
}
@Test
public void testParseSummaryLogs() throws Exception {
TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
AppState.COMPLETED);
appLogs.scanForLogs();
appLogs.parseSummaryLogs(tdm);
PluginStoreTestUtils.verifyTestEntities(tdm);
}
@Test
public void testCleanLogs() throws Exception {
// Create test dirs and files
// Irrelevant file, should not be reclaimed
Path irrelevantFilePath = new Path(
TEST_DONE_DIR_PATH, "irrelevant.log");
FSDataOutputStream stream = fs.create(irrelevantFilePath);
stream.close();
// Irrelevant directory, should not be reclaimed
Path irrelevantDirPath = new Path(TEST_DONE_DIR_PATH, "irrelevant");
fs.mkdirs(irrelevantDirPath);
Path doneAppHomeDir = new Path(new Path(TEST_DONE_DIR_PATH, "0000"), "001");
// First application, untouched after creation
Path appDirClean = new Path(doneAppHomeDir, TEST_APP_DIR_NAME);
Path attemptDirClean = new Path(appDirClean, TEST_ATTEMPT_DIR_NAME);
fs.mkdirs(attemptDirClean);
Path filePath = new Path(attemptDirClean, "test.log");
stream = fs.create(filePath);
stream.close();
// Second application, one file touched after creation
Path appDirHoldByFile = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "1");
Path attemptDirHoldByFile
= new Path(appDirHoldByFile, TEST_ATTEMPT_DIR_NAME);
fs.mkdirs(attemptDirHoldByFile);
Path filePathHold = new Path(attemptDirHoldByFile, "test1.log");
stream = fs.create(filePathHold);
stream.close();
// Third application, one dir touched after creation
Path appDirHoldByDir = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "2");
Path attemptDirHoldByDir = new Path(appDirHoldByDir, TEST_ATTEMPT_DIR_NAME);
fs.mkdirs(attemptDirHoldByDir);
Path dirPathHold = new Path(attemptDirHoldByDir, "hold");
fs.mkdirs(dirPathHold);
// Fourth application, empty dirs
Path appDirEmpty = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "3");
Path attemptDirEmpty = new Path(appDirEmpty, TEST_ATTEMPT_DIR_NAME);
fs.mkdirs(attemptDirEmpty);
Path dirPathEmpty = new Path(attemptDirEmpty, "empty");
fs.mkdirs(dirPathEmpty);
// Should retain all logs after this run
EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 10000);
assertTrue(fs.exists(irrelevantDirPath));
assertTrue(fs.exists(irrelevantFilePath));
assertTrue(fs.exists(filePath));
assertTrue(fs.exists(filePathHold));
assertTrue(fs.exists(dirPathHold));
assertTrue(fs.exists(dirPathEmpty));
// Make sure the created dir is old enough
Thread.sleep(2000);
// Touch the second application
stream = fs.append(filePathHold);
stream.writeBytes("append");
stream.close();
// Touch the third application by creating a new dir
fs.mkdirs(new Path(dirPathHold, "holdByMe"));
EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 1000);
// Verification after the second cleaner call
assertTrue(fs.exists(irrelevantDirPath));
assertTrue(fs.exists(irrelevantFilePath));
assertTrue(fs.exists(filePathHold));
assertTrue(fs.exists(dirPathHold));
assertTrue(fs.exists(doneAppHomeDir));
// appDirClean and appDirEmpty should be cleaned up
assertFalse(fs.exists(appDirClean));
assertFalse(fs.exists(appDirEmpty));
}
@Test
public void testPluginRead() throws Exception {
// Verify precondition
assertEquals(EntityGroupPlugInForTest.class.getName(),
store.getConfig().get(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
// Load data and cache item, prepare timeline store by making a cache item
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
AppState.COMPLETED);
EntityCacheItem cacheItem = new EntityCacheItem(config, fs);
cacheItem.setAppLogs(appLogs);
store.setCachedLogs(
EntityGroupPlugInForTest.getStandardTimelineGroupId(), cacheItem);
// Generate TDM
TimelineDataManager tdm
= PluginStoreTestUtils.getTdmWithStore(config, store);
// Verify single entity read
TimelineEntity entity3 = tdm.getEntity("type_3", "id_3",
EnumSet.allOf(TimelineReader.Field.class),
UserGroupInformation.getLoginUser());
assertNotNull(entity3);
assertEquals(entityNew.getStartTime(), entity3.getStartTime());
// Verify multiple entities read
TimelineEntities entities = tdm.getEntities("type_3", null, null, null,
null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
UserGroupInformation.getLoginUser());
assertEquals(entities.getEntities().size(), 1);
for (TimelineEntity entity : entities.getEntities()) {
assertEquals(entityNew.getStartTime(), entity.getStartTime());
}
}
@Test
public void testSummaryRead() throws Exception {
// Load data
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
AppState.COMPLETED);
TimelineDataManager tdm
= PluginStoreTestUtils.getTdmWithStore(config, store);
appLogs.scanForLogs();
appLogs.parseSummaryLogs(tdm);
// Verify single entity read
PluginStoreTestUtils.verifyTestEntities(tdm);
// Verify multiple entities read
TimelineEntities entities = tdm.getEntities("type_1", null, null, null,
null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
UserGroupInformation.getLoginUser());
assertEquals(entities.getEntities().size(), 1);
for (TimelineEntity entity : entities.getEntities()) {
assertEquals((Long) 123l, entity.getStartTime());
}
}
private void createTestFiles() throws IOException {
TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
PluginStoreTestUtils.writeEntities(entities,
new Path(TEST_ATTEMPT_DIR_PATH, TEST_SUMMARY_LOG_FILE_NAME), fs);
entityNew = PluginStoreTestUtils
.createEntity("id_3", "type_3", 789l, null, null,
null, null, "domain_id_1");
TimelineEntities entityList = new TimelineEntities();
entityList.addEntity(entityNew);
PluginStoreTestUtils.writeEntities(entityList,
new Path(TEST_ATTEMPT_DIR_PATH, TEST_ENTITY_LOG_FILE_NAME), fs);
FSDataOutputStream out = fs.create(
new Path(TEST_ATTEMPT_DIR_PATH, TEST_DOMAIN_LOG_FILE_NAME));
out.close();
}
}

View File

@ -0,0 +1,253 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.EnumSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class TestLogInfo {
private static final Path TEST_ROOT_DIR = new Path(
System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestLogInfo.class.getSimpleName());
private static final String TEST_ATTEMPT_DIR_NAME = "test_app";
private static final String TEST_ENTITY_FILE_NAME = "test_entity";
private static final String TEST_DOMAIN_FILE_NAME = "test_domain";
private static final String TEST_BROKEN_FILE_NAME = "test_broken";
private Configuration config = new YarnConfiguration();
private MiniDFSCluster hdfsCluster;
private FileSystem fs;
private ObjectMapper objMapper;
private JsonFactory jsonFactory = new JsonFactory();
private JsonGenerator jsonGenerator;
private FSDataOutputStream outStream = null;
private FSDataOutputStream outStreamDomain = null;
private TimelineDomain testDomain;
private static final short FILE_LOG_DIR_PERMISSIONS = 0770;
@Before
public void setup() throws Exception {
config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString());
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build();
fs = hdfsCluster.getFileSystem();
Path testAppDirPath = new Path(TEST_ROOT_DIR, TEST_ATTEMPT_DIR_NAME);
fs.mkdirs(testAppDirPath, new FsPermission(FILE_LOG_DIR_PERMISSIONS));
objMapper = PluginStoreTestUtils.createObjectMapper();
TimelineEntities testEntities = PluginStoreTestUtils.generateTestEntities();
writeEntitiesLeaveOpen(testEntities,
new Path(testAppDirPath, TEST_ENTITY_FILE_NAME));
testDomain = new TimelineDomain();
testDomain.setId("domain_1");
testDomain.setReaders(UserGroupInformation.getLoginUser().getUserName());
testDomain.setOwner(UserGroupInformation.getLoginUser().getUserName());
testDomain.setDescription("description");
writeDomainLeaveOpen(testDomain,
new Path(testAppDirPath, TEST_DOMAIN_FILE_NAME));
writeBrokenFile(new Path(testAppDirPath, TEST_BROKEN_FILE_NAME));
}
@After
public void tearDown() throws Exception {
jsonGenerator.close();
outStream.close();
outStreamDomain.close();
hdfsCluster.shutdown();
}
@Test
public void testMatchesGroupId() throws Exception {
String testGroupId = "app1_group1";
// Match
EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
"app1_group1",
UserGroupInformation.getLoginUser().getUserName());
assertTrue(testLogInfo.matchesGroupId(testGroupId));
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
"test_app1_group1",
UserGroupInformation.getLoginUser().getUserName());
assertTrue(testLogInfo.matchesGroupId(testGroupId));
// Unmatch
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app2_group1",
UserGroupInformation.getLoginUser().getUserName());
assertFalse(testLogInfo.matchesGroupId(testGroupId));
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group2",
UserGroupInformation.getLoginUser().getUserName());
assertFalse(testLogInfo.matchesGroupId(testGroupId));
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group12",
UserGroupInformation.getLoginUser().getUserName());
assertFalse(testLogInfo.matchesGroupId(testGroupId));
// Check delimiters
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group1_2",
UserGroupInformation.getLoginUser().getUserName());
assertTrue(testLogInfo.matchesGroupId(testGroupId));
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group1.dat",
UserGroupInformation.getLoginUser().getUserName());
assertTrue(testLogInfo.matchesGroupId(testGroupId));
// Check file names shorter than group id
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app2",
UserGroupInformation.getLoginUser().getUserName());
assertFalse(testLogInfo.matchesGroupId(testGroupId));
}
@Test
public void testParseEntity() throws Exception {
// Load test data
TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
TEST_ENTITY_FILE_NAME,
UserGroupInformation.getLoginUser().getUserName());
testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
fs);
// Verify for the first batch
PluginStoreTestUtils.verifyTestEntities(tdm);
// Load new data
TimelineEntity entityNew = PluginStoreTestUtils
.createEntity("id_3", "type_3", 789l, null, null,
null, null, "domain_id_1");
TimelineEntities entityList = new TimelineEntities();
entityList.addEntity(entityNew);
writeEntitiesLeaveOpen(entityList,
new Path(new Path(TEST_ROOT_DIR, TEST_ATTEMPT_DIR_NAME),
TEST_ENTITY_FILE_NAME));
testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
fs);
// Verify the newly added data
TimelineEntity entity3 = tdm.getEntity(entityNew.getEntityType(),
entityNew.getEntityId(), EnumSet.allOf(TimelineReader.Field.class),
UserGroupInformation.getLoginUser());
assertNotNull(entity3);
assertEquals("Failed to read out entity new",
entityNew.getStartTime(), entity3.getStartTime());
tdm.close();
}
@Test
public void testParseBrokenEntity() throws Exception {
// Load test data
TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
TEST_BROKEN_FILE_NAME,
UserGroupInformation.getLoginUser().getUserName());
DomainLogInfo domainLogInfo = new DomainLogInfo(TEST_ATTEMPT_DIR_NAME,
TEST_BROKEN_FILE_NAME,
UserGroupInformation.getLoginUser().getUserName());
// Try parse, should not fail
testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
fs);
domainLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
fs);
tdm.close();
}
@Test
public void testParseDomain() throws Exception {
// Load test data
TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
DomainLogInfo domainLogInfo = new DomainLogInfo(TEST_ATTEMPT_DIR_NAME,
TEST_DOMAIN_FILE_NAME,
UserGroupInformation.getLoginUser().getUserName());
domainLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
fs);
// Verify domain data
TimelineDomain resultDomain = tdm.getDomain("domain_1",
UserGroupInformation.getLoginUser());
assertNotNull(resultDomain);
assertEquals(testDomain.getReaders(), resultDomain.getReaders());
assertEquals(testDomain.getOwner(), resultDomain.getOwner());
assertEquals(testDomain.getDescription(), resultDomain.getDescription());
}
private void writeBrokenFile(Path logPath) throws IOException {
FSDataOutputStream out = null;
try {
String broken = "{ broken { [[]} broken";
out = PluginStoreTestUtils.createLogFile(logPath, fs);
out.write(broken.getBytes(Charset.forName("UTF-8")));
out.close();
out = null;
} finally {
if (out != null) {
out.close();
}
}
}
// TestLogInfo needs to maintain opened hdfs files so we have to build our own
// write methods
private void writeEntitiesLeaveOpen(TimelineEntities entities, Path logPath)
throws IOException {
if (outStream == null) {
outStream = PluginStoreTestUtils.createLogFile(logPath, fs);
jsonGenerator = (new JsonFactory()).createJsonGenerator(outStream);
jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
}
for (TimelineEntity entity : entities.getEntities()) {
objMapper.writeValue(jsonGenerator, entity);
}
outStream.hflush();
}
private void writeDomainLeaveOpen(TimelineDomain domain, Path logPath)
throws IOException {
if (outStreamDomain == null) {
outStreamDomain = PluginStoreTestUtils.createLogFile(logPath, fs);
}
// Write domain uses its own json generator to isolate from entity writers
JsonGenerator jsonGeneratorLocal
= (new JsonFactory()).createJsonGenerator(outStreamDomain);
jsonGeneratorLocal.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
objMapper.writeValue(jsonGeneratorLocal, domain);
outStreamDomain.hflush();
}
}

View File

@ -42,5 +42,6 @@
<module>hadoop-yarn-server-sharedcachemanager</module>
<module>hadoop-yarn-server-tests</module>
<module>hadoop-yarn-server-applicationhistoryservice</module>
<module>hadoop-yarn-server-timeline-pluginstorage</module>
</modules>
</project>