YARN-4265. Provide new timeline plugin storage to support fine-grained entity caching. Contributed by Li Lu and Jason Lowe
(cherry picked from commit 02f597c5db
)
This commit is contained in:
parent
2b9ea68ef8
commit
4a30a44b11
|
@ -280,6 +280,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-4234. New put APIs in TimelineClient for ats v1.5. (Xuan Gong via
|
||||
junping_du)
|
||||
|
||||
YARN-4265. Provide new timeline plugin storage to support fine-grained entity
|
||||
caching. (Li Lu and Jason Lowe via junping_du)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-644. Basic null check is not performed on passed in arguments before
|
||||
|
|
|
@ -1590,6 +1590,7 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String TIMELINE_SERVICE_VERSION = TIMELINE_SERVICE_PREFIX
|
||||
+ "version";
|
||||
public static final float DEFAULT_TIMELINE_SERVICE_VERSION = 1.0f;
|
||||
|
||||
/**
|
||||
* Comma seperated list of names for UIs hosted in the timeline server
|
||||
* (For pluggable UIs).
|
||||
|
@ -1625,6 +1626,63 @@ public class YarnConfiguration extends Configuration {
|
|||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT =
|
||||
"/tmp/entity-file-history/active";
|
||||
|
||||
public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR =
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "done-dir";
|
||||
public static final String
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT =
|
||||
"/tmp/entity-file-history/done";
|
||||
|
||||
public static final String TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES =
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "group-id-plugin-classes";
|
||||
|
||||
public static final String
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_STORE =
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-store";
|
||||
|
||||
public static final String
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES =
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-entity-types";
|
||||
|
||||
public static final String
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS =
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "scan-interval-seconds";
|
||||
public static final long
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT = 60;
|
||||
|
||||
public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS =
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "threads";
|
||||
public static final int
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT = 16;
|
||||
|
||||
public static final String
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE
|
||||
= TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "app-cache-size";
|
||||
public static final int
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE_DEFAULT = 10;
|
||||
|
||||
public static final String
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS =
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "cleaner-interval-seconds";
|
||||
public static final int
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS_DEFAULT =
|
||||
60 * 60;
|
||||
|
||||
public static final String
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS
|
||||
= TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retain-seconds";
|
||||
public static final int
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS_DEFAULT =
|
||||
7 * 24 * 60 * 60;
|
||||
|
||||
// how old the most recent log of an UNKNOWN app needs to be in the active
|
||||
// directory before we treat it as COMPLETED
|
||||
public static final String
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS =
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "unknown-active-seconds";
|
||||
public static final int
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS_DEFAULT
|
||||
= 24 * 60 * 60;
|
||||
|
||||
public static final String
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy-spec";
|
||||
|
@ -1632,10 +1690,6 @@ public class YarnConfiguration extends Configuration {
|
|||
DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
|
||||
"2000, 500";
|
||||
|
||||
public static final String
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES =
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-entity-types";
|
||||
|
||||
public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS =
|
||||
TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs";
|
||||
public static final long
|
||||
|
|
|
@ -1978,6 +1978,64 @@
|
|||
<value>${hadoop.tmp.dir}/yarn/timeline</value>
|
||||
</property>
|
||||
|
||||
<!-- Timeline Service v1.5 Configuration -->
|
||||
|
||||
<property>
|
||||
<name>yarn.timeline-service.entity-group-fs-store.active-dir</name>
|
||||
<value>/tmp/entity-file-history/active</value>
|
||||
<description>HDFS path to store active application’s timeline data</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.timeline-service.entity-group-fs-store.done-dir</name>
|
||||
<value>/tmp/entity-file-history/done/</value>
|
||||
<description>HDFS path to store done application’s timeline data</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes</name>
|
||||
<value></value>
|
||||
<description>
|
||||
Plugins that can translate a timeline entity read request into
|
||||
a list of timeline entity group ids, separated by commas.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.timeline-service.entity-group-fs-store.summary-store</name>
|
||||
<description>Summary storage for ATS v1.5</description>
|
||||
<value>org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.timeline-service.entity-group-fs-store.scan-interval-seconds</name>
|
||||
<description>
|
||||
Scan interval for ATS v1.5 entity group file system storage reader.This
|
||||
value controls how frequent the reader will scan the HDFS active directory
|
||||
for application status.
|
||||
</description>
|
||||
<value>60</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.timeline-service.entity-group-fs-store.cleaner-interval-seconds</name>
|
||||
<description>
|
||||
Scan interval for ATS v1.5 entity group file system storage cleaner.This
|
||||
value controls how frequent the reader will scan the HDFS done directory
|
||||
for stale application data.
|
||||
</description>
|
||||
<value>3600</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.timeline-service.entity-group-fs-store.retain-seconds</name>
|
||||
<description>
|
||||
How long the ATS v1.5 entity group file system storage will keep an
|
||||
application's data in the done directory.
|
||||
</description>
|
||||
<value>604800</value>
|
||||
</property>
|
||||
|
||||
<!-- Shared Cache Configuration -->
|
||||
|
||||
<property>
|
||||
|
|
|
@ -220,6 +220,17 @@
|
|||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>test-jar</goal>
|
||||
</goals>
|
||||
<phase>test-compile</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
|
@ -228,8 +228,9 @@ public class ApplicationHistoryServer extends CompositeService {
|
|||
}
|
||||
|
||||
private TimelineDataManager createTimelineDataManager(Configuration conf) {
|
||||
return new TimelineDataManager(
|
||||
timelineStore, new TimelineACLsManager(conf));
|
||||
TimelineACLsManager aclsMgr = new TimelineACLsManager(conf);
|
||||
aclsMgr.setTimelineStore(timelineStore);
|
||||
return new TimelineDataManager(timelineStore, aclsMgr);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
|
@ -67,7 +67,6 @@ public class TimelineDataManager extends AbstractService {
|
|||
super(TimelineDataManager.class.getName());
|
||||
this.store = store;
|
||||
this.timelineACLsManager = timelineACLsManager;
|
||||
timelineACLsManager.setTimelineStore(store);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -92,12 +92,17 @@ public class TimelineDataManagerMetrics {
|
|||
getDomainsOps.value();
|
||||
}
|
||||
|
||||
private static TimelineDataManagerMetrics instance = null;
|
||||
|
||||
TimelineDataManagerMetrics() {
|
||||
}
|
||||
|
||||
public static TimelineDataManagerMetrics create() {
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
return ms.register(new TimelineDataManagerMetrics());
|
||||
public static synchronized TimelineDataManagerMetrics create() {
|
||||
if (instance == null) {
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
instance = ms.register(new TimelineDataManagerMetrics());
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
public void incrGetEntitiesOps() {
|
||||
|
|
|
@ -65,6 +65,7 @@ public class TestApplicationHistoryClientService {
|
|||
TimelineStore store =
|
||||
TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS);
|
||||
TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
|
||||
aclsManager.setTimelineStore(store);
|
||||
dataManager =
|
||||
new TimelineDataManager(store, aclsManager);
|
||||
dataManager.init(conf);
|
||||
|
|
|
@ -96,6 +96,7 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
public void setup() throws Exception {
|
||||
// Only test the ACLs of the generic history
|
||||
TimelineACLsManager aclsManager = new TimelineACLsManager(new YarnConfiguration());
|
||||
aclsManager.setTimelineStore(store);
|
||||
TimelineDataManager dataManager =
|
||||
new TimelineDataManager(store, aclsManager);
|
||||
dataManager.init(conf);
|
||||
|
|
|
@ -90,6 +90,7 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
TimelineStore store =
|
||||
TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS);
|
||||
TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
|
||||
aclsManager.setTimelineStore(store);
|
||||
TimelineDataManager dataManager =
|
||||
new TimelineDataManager(store, aclsManager);
|
||||
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||
|
|
|
@ -62,6 +62,7 @@ public class TestTimelineDataManager extends TimelineStoreTestUtils {
|
|||
|
||||
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
|
||||
aclsManager = new TimelineACLsManager(conf);
|
||||
aclsManager.setTimelineStore(store);
|
||||
dataManaer = new TimelineDataManager(store, aclsManager);
|
||||
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
|
||||
|
|
|
@ -0,0 +1,136 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
|
||||
http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>hadoop-yarn-server</artifactId>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<version>3.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-timeline-pluginstorage</artifactId>
|
||||
<version>3.0.0-SNAPSHOT</version>
|
||||
<name>Apache Hadoop YARN Timeline Plugin Storage</name>
|
||||
|
||||
<properties>
|
||||
<!-- Needed for generating FindBugs warnings using parent pom -->
|
||||
<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<scope>provided</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>commons-el</groupId>
|
||||
<artifactId>commons-el</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>tomcat</groupId>
|
||||
<artifactId>jasper-runtime</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>tomcat</groupId>
|
||||
<artifactId>jasper-compiler</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jsp-2.1-jetty</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-common</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-core-asl</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-mapper-asl</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-xc</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-all</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,170 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.timeline;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
|
||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
||||
import org.codehaus.jackson.JsonFactory;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Cache item for timeline server v1.5 reader cache. Each cache item has a
|
||||
* TimelineStore that can be filled with data within one entity group.
|
||||
*/
|
||||
public class EntityCacheItem {
|
||||
private static final Logger LOG
|
||||
= LoggerFactory.getLogger(EntityCacheItem.class);
|
||||
|
||||
private TimelineStore store;
|
||||
private EntityGroupFSTimelineStore.AppLogs appLogs;
|
||||
private long lastRefresh;
|
||||
private Configuration config;
|
||||
private FileSystem fs;
|
||||
|
||||
public EntityCacheItem(Configuration config, FileSystem fs) {
|
||||
this.config = config;
|
||||
this.fs = fs;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The application log associated to this cache item, may be null.
|
||||
*/
|
||||
public synchronized EntityGroupFSTimelineStore.AppLogs getAppLogs() {
|
||||
return this.appLogs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the application logs to this cache item. The entity group should be
|
||||
* associated with this application.
|
||||
*
|
||||
* @param incomingAppLogs
|
||||
*/
|
||||
public synchronized void setAppLogs(
|
||||
EntityGroupFSTimelineStore.AppLogs incomingAppLogs) {
|
||||
this.appLogs = incomingAppLogs;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The timeline store, either loaded or unloaded, of this cache item.
|
||||
*/
|
||||
public synchronized TimelineStore getStore() {
|
||||
return store;
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh this cache item if it needs refresh. This will enforce an appLogs
|
||||
* rescan and then load new data. The refresh process is synchronized with
|
||||
* other operations on the same cache item.
|
||||
*
|
||||
* @param groupId
|
||||
* @param aclManager
|
||||
* @param jsonFactory
|
||||
* @param objMapper
|
||||
* @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore}
|
||||
* object filled with all entities in the group.
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
|
||||
TimelineACLsManager aclManager, JsonFactory jsonFactory,
|
||||
ObjectMapper objMapper) throws IOException {
|
||||
if (needRefresh()) {
|
||||
// If an application is not finished, we only update summary logs (and put
|
||||
// new entities into summary storage).
|
||||
// Otherwise, since the application is done, we can update detail logs.
|
||||
if (!appLogs.isDone()) {
|
||||
appLogs.parseSummaryLogs();
|
||||
} else if (appLogs.getDetailLogs().isEmpty()) {
|
||||
appLogs.scanForLogs();
|
||||
}
|
||||
if (!appLogs.getDetailLogs().isEmpty()) {
|
||||
if (store == null) {
|
||||
store = new MemoryTimelineStore();
|
||||
store.init(config);
|
||||
store.start();
|
||||
}
|
||||
TimelineDataManager tdm = new TimelineDataManager(store,
|
||||
aclManager);
|
||||
tdm.init(config);
|
||||
tdm.start();
|
||||
List<LogInfo> removeList = new ArrayList<LogInfo>();
|
||||
for (LogInfo log : appLogs.getDetailLogs()) {
|
||||
LOG.debug("Try refresh logs for {}", log.getFilename());
|
||||
// Only refresh the log that matches the cache id
|
||||
if (log.matchesGroupId(groupId)) {
|
||||
Path appDirPath = appLogs.getAppDirPath();
|
||||
if (fs.exists(log.getPath(appDirPath))) {
|
||||
LOG.debug("Refresh logs for cache id {}", groupId);
|
||||
log.parseForStore(tdm, appDirPath, appLogs.isDone(), jsonFactory,
|
||||
objMapper, fs);
|
||||
} else {
|
||||
// The log may have been removed, remove the log
|
||||
removeList.add(log);
|
||||
LOG.info("File {} no longer exists, remove it from log list",
|
||||
log.getPath(appDirPath));
|
||||
}
|
||||
}
|
||||
}
|
||||
appLogs.getDetailLogs().removeAll(removeList);
|
||||
tdm.close();
|
||||
}
|
||||
updateRefreshTimeToNow();
|
||||
} else {
|
||||
LOG.debug("Cache new enough, skip refreshing");
|
||||
}
|
||||
return store;
|
||||
}
|
||||
|
||||
/**
|
||||
* Release the cache item for the given group id.
|
||||
*
|
||||
* @param groupId
|
||||
*/
|
||||
public synchronized void releaseCache(TimelineEntityGroupId groupId) {
|
||||
try {
|
||||
if (store != null) {
|
||||
store.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Error closing timeline store", e);
|
||||
}
|
||||
store = null;
|
||||
// reset offsets so next time logs are re-parsed
|
||||
for (LogInfo log : appLogs.getDetailLogs()) {
|
||||
if (log.getFilename().contains(groupId.toString())) {
|
||||
log.setOffset(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean needRefresh() {
|
||||
return (Time.monotonicNow() - lastRefresh > 10000);
|
||||
}
|
||||
|
||||
private void updateRefreshTimeToNow() {
|
||||
this.lastRefresh = Time.monotonicNow();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,895 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timeline;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
|
||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.codehaus.jackson.JsonFactory;
|
||||
import org.codehaus.jackson.map.MappingJsonFactory;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Plugin timeline storage to support timeline server v1.5 API. This storage
|
||||
* uses a file system to store timeline entities in their groups.
|
||||
*/
|
||||
public class EntityGroupFSTimelineStore extends AbstractService
|
||||
implements TimelineStore {
|
||||
|
||||
static final String DOMAIN_LOG_PREFIX = "domainlog-";
|
||||
static final String SUMMARY_LOG_PREFIX = "summarylog-";
|
||||
static final String ENTITY_LOG_PREFIX = "entitylog-";
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
EntityGroupFSTimelineStore.class);
|
||||
private static final FsPermission ACTIVE_DIR_PERMISSION =
|
||||
new FsPermission((short) 01777);
|
||||
private static final FsPermission DONE_DIR_PERMISSION =
|
||||
new FsPermission((short) 0700);
|
||||
|
||||
private static final EnumSet<YarnApplicationState>
|
||||
APP_FINAL_STATES = EnumSet.of(
|
||||
YarnApplicationState.FAILED,
|
||||
YarnApplicationState.KILLED,
|
||||
YarnApplicationState.FINISHED);
|
||||
// Active dir: <activeRoot>/appId/attemptId/cacheId.log
|
||||
// Done dir: <doneRoot>/cluster_ts/hash1/hash2/appId/attemptId/cacheId.log
|
||||
private static final String APP_DONE_DIR_PREFIX_FORMAT =
|
||||
"%d" + Path.SEPARATOR // cluster timestamp
|
||||
+ "%04d" + Path.SEPARATOR // app num / 1,000,000
|
||||
+ "%03d" + Path.SEPARATOR // (app num / 1000) % 1000
|
||||
+ "%s" + Path.SEPARATOR; // full app id
|
||||
|
||||
private YarnClient yarnClient;
|
||||
private TimelineStore summaryStore;
|
||||
private TimelineACLsManager aclManager;
|
||||
private TimelineDataManager summaryTdm;
|
||||
private ConcurrentMap<ApplicationId, AppLogs> appIdLogMap =
|
||||
new ConcurrentHashMap<ApplicationId, AppLogs>();
|
||||
private ScheduledThreadPoolExecutor executor;
|
||||
private FileSystem fs;
|
||||
private ObjectMapper objMapper;
|
||||
private JsonFactory jsonFactory;
|
||||
private Path activeRootPath;
|
||||
private Path doneRootPath;
|
||||
private long logRetainMillis;
|
||||
private long unknownActiveMillis;
|
||||
private int appCacheMaxSize = 0;
|
||||
private List<TimelineEntityGroupPlugin> cacheIdPlugins;
|
||||
private Map<TimelineEntityGroupId, EntityCacheItem> cachedLogs;
|
||||
|
||||
public EntityGroupFSTimelineStore() {
|
||||
super(EntityGroupFSTimelineStore.class.getSimpleName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
summaryStore = createSummaryStore();
|
||||
summaryStore.init(conf);
|
||||
long logRetainSecs = conf.getLong(
|
||||
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS,
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS_DEFAULT);
|
||||
logRetainMillis = logRetainSecs * 1000;
|
||||
LOG.info("Cleaner set to delete logs older than {} seconds", logRetainSecs);
|
||||
long unknownActiveSecs = conf.getLong(
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS,
|
||||
YarnConfiguration.
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS_DEFAULT
|
||||
);
|
||||
unknownActiveMillis = unknownActiveSecs * 1000;
|
||||
LOG.info("Unknown apps will be treated as complete after {} seconds",
|
||||
unknownActiveSecs);
|
||||
appCacheMaxSize = conf.getInt(
|
||||
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE,
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE_DEFAULT);
|
||||
LOG.info("Application cache size is {}", appCacheMaxSize);
|
||||
cachedLogs = Collections.synchronizedMap(
|
||||
new LinkedHashMap<TimelineEntityGroupId, EntityCacheItem>(
|
||||
appCacheMaxSize + 1, 0.75f, true) {
|
||||
@Override
|
||||
protected boolean removeEldestEntry(
|
||||
Map.Entry<TimelineEntityGroupId, EntityCacheItem> eldest) {
|
||||
if (super.size() > appCacheMaxSize) {
|
||||
TimelineEntityGroupId groupId = eldest.getKey();
|
||||
LOG.debug("Evicting {} due to space limitations", groupId);
|
||||
EntityCacheItem cacheItem = eldest.getValue();
|
||||
cacheItem.releaseCache(groupId);
|
||||
if (cacheItem.getAppLogs().isDone()) {
|
||||
appIdLogMap.remove(groupId.getApplicationId());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
});
|
||||
cacheIdPlugins = loadPlugIns(conf);
|
||||
// Initialize yarn client for application status
|
||||
yarnClient = YarnClient.createYarnClient();
|
||||
yarnClient.init(conf);
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
private List<TimelineEntityGroupPlugin> loadPlugIns(Configuration conf)
|
||||
throws RuntimeException {
|
||||
Collection<String> pluginNames = conf.getStringCollection(
|
||||
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES);
|
||||
List<TimelineEntityGroupPlugin> pluginList
|
||||
= new LinkedList<TimelineEntityGroupPlugin>();
|
||||
for (final String name : pluginNames) {
|
||||
LOG.debug("Trying to load plugin class {}", name);
|
||||
TimelineEntityGroupPlugin cacheIdPlugin = null;
|
||||
try {
|
||||
Class<?> clazz = conf.getClassByName(name);
|
||||
cacheIdPlugin =
|
||||
(TimelineEntityGroupPlugin) ReflectionUtils.newInstance(
|
||||
clazz, conf);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Error loading plugin " + name, e);
|
||||
}
|
||||
|
||||
if (cacheIdPlugin == null) {
|
||||
throw new RuntimeException("No class defined for " + name);
|
||||
}
|
||||
LOG.info("Load plugin class {}", cacheIdPlugin.getClass().getName());
|
||||
pluginList.add(cacheIdPlugin);
|
||||
}
|
||||
return pluginList;
|
||||
}
|
||||
|
||||
private TimelineStore createSummaryStore() {
|
||||
return ReflectionUtils.newInstance(getConfig().getClass(
|
||||
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_STORE,
|
||||
LeveldbTimelineStore.class, TimelineStore.class), getConfig());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
LOG.info("Starting {}", getName());
|
||||
yarnClient.start();
|
||||
summaryStore.start();
|
||||
|
||||
Configuration conf = getConfig();
|
||||
aclManager = new TimelineACLsManager(conf);
|
||||
aclManager.setTimelineStore(summaryStore);
|
||||
summaryTdm = new TimelineDataManager(summaryStore, aclManager);
|
||||
summaryTdm.init(conf);
|
||||
summaryTdm.start();
|
||||
activeRootPath = new Path(conf.get(
|
||||
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
|
||||
doneRootPath = new Path(conf.get(
|
||||
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT));
|
||||
fs = activeRootPath.getFileSystem(conf);
|
||||
if (!fs.exists(activeRootPath)) {
|
||||
fs.mkdirs(activeRootPath);
|
||||
fs.setPermission(activeRootPath, ACTIVE_DIR_PERMISSION);
|
||||
}
|
||||
if (!fs.exists(doneRootPath)) {
|
||||
fs.mkdirs(doneRootPath);
|
||||
fs.setPermission(doneRootPath, DONE_DIR_PERMISSION);
|
||||
}
|
||||
|
||||
objMapper = new ObjectMapper();
|
||||
objMapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector());
|
||||
jsonFactory = new MappingJsonFactory(objMapper);
|
||||
final long scanIntervalSecs = conf.getLong(
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS,
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT
|
||||
);
|
||||
final long cleanerIntervalSecs = conf.getLong(
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS,
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS_DEFAULT
|
||||
);
|
||||
final int numThreads = conf.getInt(
|
||||
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS,
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT);
|
||||
LOG.info("Scanning active directory every {} seconds", scanIntervalSecs);
|
||||
LOG.info("Cleaning logs every {} seconds", cleanerIntervalSecs);
|
||||
|
||||
executor = new ScheduledThreadPoolExecutor(numThreads,
|
||||
new ThreadFactoryBuilder().setNameFormat("EntityLogPluginWorker #%d")
|
||||
.build());
|
||||
executor.scheduleAtFixedRate(new EntityLogScanner(), 0, scanIntervalSecs,
|
||||
TimeUnit.SECONDS);
|
||||
executor.scheduleAtFixedRate(new EntityLogCleaner(), cleanerIntervalSecs,
|
||||
cleanerIntervalSecs, TimeUnit.SECONDS);
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
LOG.info("Stopping {}", getName());
|
||||
if (executor != null) {
|
||||
executor.shutdown();
|
||||
if (executor.isTerminating()) {
|
||||
LOG.info("Waiting for executor to terminate");
|
||||
boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
|
||||
if (terminated) {
|
||||
LOG.info("Executor terminated");
|
||||
} else {
|
||||
LOG.warn("Executor did not terminate");
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (summaryTdm != null) {
|
||||
summaryTdm.stop();
|
||||
}
|
||||
if (summaryStore != null) {
|
||||
summaryStore.stop();
|
||||
}
|
||||
if (yarnClient != null) {
|
||||
yarnClient.stop();
|
||||
}
|
||||
synchronized (cachedLogs) {
|
||||
for (EntityCacheItem cacheItem : cachedLogs.values()) {
|
||||
cacheItem.getStore().close();
|
||||
}
|
||||
}
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@VisibleForTesting
|
||||
void scanActiveLogs() throws IOException {
|
||||
RemoteIterator<FileStatus> iter = fs.listStatusIterator(activeRootPath);
|
||||
while (iter.hasNext()) {
|
||||
FileStatus stat = iter.next();
|
||||
ApplicationId appId = parseApplicationId(stat.getPath().getName());
|
||||
if (appId != null) {
|
||||
LOG.debug("scan logs for {} in {}", appId, stat.getPath());
|
||||
AppLogs logs = getAndSetActiveLog(appId, stat.getPath());
|
||||
executor.execute(new ActiveLogParser(logs));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private AppLogs createAndPutAppLogsIfAbsent(ApplicationId appId,
|
||||
Path appDirPath, AppState appState) {
|
||||
AppLogs appLogs = new AppLogs(appId, appDirPath, appState);
|
||||
AppLogs oldAppLogs = appIdLogMap.putIfAbsent(appId, appLogs);
|
||||
if (oldAppLogs != null) {
|
||||
appLogs = oldAppLogs;
|
||||
}
|
||||
return appLogs;
|
||||
}
|
||||
|
||||
private AppLogs getAndSetActiveLog(ApplicationId appId, Path appDirPath) {
|
||||
AppLogs appLogs = appIdLogMap.get(appId);
|
||||
if (appLogs == null) {
|
||||
appLogs = createAndPutAppLogsIfAbsent(appId, appDirPath, AppState.ACTIVE);
|
||||
}
|
||||
return appLogs;
|
||||
}
|
||||
|
||||
// searches for the app logs and returns it if found else null
|
||||
private AppLogs getAndSetAppLogs(ApplicationId applicationId)
|
||||
throws IOException {
|
||||
LOG.debug("Looking for app logs mapped for app id {}", applicationId);
|
||||
AppLogs appLogs = appIdLogMap.get(applicationId);
|
||||
if (appLogs == null) {
|
||||
AppState appState = AppState.UNKNOWN;
|
||||
Path appDirPath = getDoneAppPath(applicationId);
|
||||
if (fs.exists(appDirPath)) {
|
||||
appState = AppState.COMPLETED;
|
||||
} else {
|
||||
appDirPath = getActiveAppPath(applicationId);
|
||||
if (fs.exists(appDirPath)) {
|
||||
appState = AppState.ACTIVE;
|
||||
}
|
||||
}
|
||||
if (appState != AppState.UNKNOWN) {
|
||||
LOG.debug("Create and try to add new appLogs to appIdLogMap for {}",
|
||||
applicationId);
|
||||
appLogs = createAndPutAppLogsIfAbsent(
|
||||
applicationId, appDirPath, appState);
|
||||
}
|
||||
}
|
||||
return appLogs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Main function for entity log cleaner. This method performs depth first
|
||||
* search from a given dir path for all application log dirs. Once found, it
|
||||
* will decide if the directory should be cleaned up and then clean them.
|
||||
*
|
||||
* @param dirpath the root directory the cleaner should start with. Note that
|
||||
* dirpath should be a directory that contains a set of
|
||||
* application log directories. The cleaner method will not
|
||||
* work if the given dirpath itself is an application log dir.
|
||||
* @param fs
|
||||
* @param retainMillis
|
||||
* @throws IOException
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@VisibleForTesting
|
||||
static void cleanLogs(Path dirpath, FileSystem fs, long retainMillis)
|
||||
throws IOException {
|
||||
long now = Time.now();
|
||||
// Depth first search from root directory for all application log dirs
|
||||
RemoteIterator<FileStatus> iter = fs.listStatusIterator(dirpath);
|
||||
while (iter.hasNext()) {
|
||||
FileStatus stat = iter.next();
|
||||
if (stat.isDirectory()) {
|
||||
// If current is an application log dir, decide if we need to remove it
|
||||
// and remove if necessary.
|
||||
// Otherwise, keep iterating into it.
|
||||
ApplicationId appId = parseApplicationId(dirpath.getName());
|
||||
if (appId != null) { // Application log dir
|
||||
if (shouldCleanAppLogDir(dirpath, now, fs, retainMillis)) {
|
||||
try {
|
||||
LOG.info("Deleting {}", dirpath);
|
||||
if (!fs.delete(dirpath, true)) {
|
||||
LOG.error("Unable to remove " + dirpath);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to remove " + dirpath, e);
|
||||
}
|
||||
}
|
||||
} else { // Keep cleaning inside
|
||||
cleanLogs(stat.getPath(), fs, retainMillis);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean shouldCleanAppLogDir(Path appLogPath, long now,
|
||||
FileSystem fs, long logRetainMillis) throws IOException {
|
||||
RemoteIterator<FileStatus> iter = fs.listStatusIterator(appLogPath);
|
||||
while (iter.hasNext()) {
|
||||
FileStatus stat = iter.next();
|
||||
if (now - stat.getModificationTime() <= logRetainMillis) {
|
||||
// found a dir entry that is fresh enough to prevent
|
||||
// cleaning this directory.
|
||||
LOG.debug("{} not being cleaned due to {}", appLogPath, stat.getPath());
|
||||
return false;
|
||||
}
|
||||
// Otherwise, keep searching files inside for directories.
|
||||
if (stat.isDirectory()) {
|
||||
if (!shouldCleanAppLogDir(stat.getPath(), now, fs, logRetainMillis)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// converts the String to an ApplicationId or null if conversion failed
|
||||
private static ApplicationId parseApplicationId(String appIdStr) {
|
||||
ApplicationId appId = null;
|
||||
if (appIdStr.startsWith(ApplicationId.appIdStrPrefix)) {
|
||||
try {
|
||||
appId = ConverterUtils.toApplicationId(appIdStr);
|
||||
} catch (IllegalArgumentException e) {
|
||||
appId = null;
|
||||
}
|
||||
}
|
||||
return appId;
|
||||
}
|
||||
|
||||
private Path getActiveAppPath(ApplicationId appId) {
|
||||
return new Path(activeRootPath, appId.toString());
|
||||
}
|
||||
|
||||
private Path getDoneAppPath(ApplicationId appId) {
|
||||
// cut up the app ID into mod(1000) buckets
|
||||
int appNum = appId.getId();
|
||||
appNum /= 1000;
|
||||
int bucket2 = appNum % 1000;
|
||||
int bucket1 = appNum / 1000;
|
||||
return new Path(doneRootPath,
|
||||
String.format(APP_DONE_DIR_PREFIX_FORMAT, appId.getClusterTimestamp(),
|
||||
bucket1, bucket2, appId.toString()));
|
||||
}
|
||||
|
||||
// This method has to be synchronized to control traffic to RM
|
||||
private static synchronized AppState getAppState(ApplicationId appId,
|
||||
YarnClient yarnClient) throws IOException {
|
||||
AppState appState = AppState.ACTIVE;
|
||||
try {
|
||||
ApplicationReport report = yarnClient.getApplicationReport(appId);
|
||||
YarnApplicationState yarnState = report.getYarnApplicationState();
|
||||
if (APP_FINAL_STATES.contains(yarnState)) {
|
||||
appState = AppState.COMPLETED;
|
||||
}
|
||||
} catch (ApplicationNotFoundException e) {
|
||||
appState = AppState.UNKNOWN;
|
||||
} catch (YarnException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
return appState;
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@VisibleForTesting
|
||||
enum AppState {
|
||||
ACTIVE,
|
||||
UNKNOWN,
|
||||
COMPLETED
|
||||
}
|
||||
|
||||
class AppLogs {
|
||||
private ApplicationId appId;
|
||||
private Path appDirPath;
|
||||
private AppState appState;
|
||||
private List<LogInfo> summaryLogs = new ArrayList<LogInfo>();
|
||||
private List<LogInfo> detailLogs = new ArrayList<LogInfo>();
|
||||
|
||||
public AppLogs(ApplicationId appId, Path appPath, AppState state) {
|
||||
this.appId = appId;
|
||||
appDirPath = appPath;
|
||||
appState = state;
|
||||
}
|
||||
|
||||
public synchronized boolean isDone() {
|
||||
return appState == AppState.COMPLETED;
|
||||
}
|
||||
|
||||
public synchronized ApplicationId getAppId() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
public synchronized Path getAppDirPath() {
|
||||
return appDirPath;
|
||||
}
|
||||
|
||||
synchronized List<LogInfo> getSummaryLogs() {
|
||||
return summaryLogs;
|
||||
}
|
||||
|
||||
synchronized List<LogInfo> getDetailLogs() {
|
||||
return detailLogs;
|
||||
}
|
||||
|
||||
public synchronized void parseSummaryLogs() throws IOException {
|
||||
parseSummaryLogs(summaryTdm);
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@VisibleForTesting
|
||||
synchronized void parseSummaryLogs(TimelineDataManager tdm)
|
||||
throws IOException {
|
||||
if (!isDone()) {
|
||||
LOG.debug("Try to parse summary log for log {} in {}",
|
||||
appId, appDirPath);
|
||||
appState = EntityGroupFSTimelineStore.getAppState(appId, yarnClient);
|
||||
long recentLogModTime = scanForLogs();
|
||||
if (appState == AppState.UNKNOWN) {
|
||||
if (Time.now() - recentLogModTime > unknownActiveMillis) {
|
||||
LOG.info(
|
||||
"{} state is UNKNOWN and logs are stale, assuming COMPLETED",
|
||||
appId);
|
||||
appState = AppState.COMPLETED;
|
||||
}
|
||||
}
|
||||
}
|
||||
List<LogInfo> removeList = new ArrayList<LogInfo>();
|
||||
for (LogInfo log : summaryLogs) {
|
||||
if (fs.exists(log.getPath(appDirPath))) {
|
||||
log.parseForStore(tdm, appDirPath, isDone(), jsonFactory,
|
||||
objMapper, fs);
|
||||
} else {
|
||||
// The log may have been removed, remove the log
|
||||
removeList.add(log);
|
||||
LOG.info("File {} no longer exists, remove it from log list",
|
||||
log.getPath(appDirPath));
|
||||
}
|
||||
}
|
||||
summaryLogs.removeAll(removeList);
|
||||
}
|
||||
|
||||
// scans for new logs and returns the modification timestamp of the
|
||||
// most recently modified log
|
||||
@InterfaceAudience.Private
|
||||
@VisibleForTesting
|
||||
long scanForLogs() throws IOException {
|
||||
LOG.debug("scanForLogs on {}", appDirPath);
|
||||
long newestModTime = 0;
|
||||
RemoteIterator<FileStatus> iterAttempt =
|
||||
fs.listStatusIterator(appDirPath);
|
||||
while (iterAttempt.hasNext()) {
|
||||
FileStatus statAttempt = iterAttempt.next();
|
||||
LOG.debug("scanForLogs on {}", statAttempt.getPath().getName());
|
||||
if (!statAttempt.isDirectory()
|
||||
|| !statAttempt.getPath().getName()
|
||||
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
|
||||
LOG.debug("Scanner skips for unknown dir/file {}",
|
||||
statAttempt.getPath());
|
||||
continue;
|
||||
}
|
||||
String attemptDirName = statAttempt.getPath().getName();
|
||||
RemoteIterator<FileStatus> iterCache
|
||||
= fs.listStatusIterator(statAttempt.getPath());
|
||||
while (iterCache.hasNext()) {
|
||||
FileStatus statCache = iterCache.next();
|
||||
if (!statCache.isFile()) {
|
||||
continue;
|
||||
}
|
||||
String filename = statCache.getPath().getName();
|
||||
// We should only update time for log files.
|
||||
boolean shouldSetTime = true;
|
||||
LOG.debug("scan for log file: {}", filename);
|
||||
if (filename.startsWith(DOMAIN_LOG_PREFIX)) {
|
||||
addSummaryLog(attemptDirName, filename, statCache.getOwner(), true);
|
||||
} else if (filename.startsWith(SUMMARY_LOG_PREFIX)) {
|
||||
addSummaryLog(attemptDirName, filename, statCache.getOwner(),
|
||||
false);
|
||||
} else if (filename.startsWith(ENTITY_LOG_PREFIX)) {
|
||||
addDetailLog(attemptDirName, filename, statCache.getOwner());
|
||||
} else {
|
||||
shouldSetTime = false;
|
||||
}
|
||||
if (shouldSetTime) {
|
||||
newestModTime
|
||||
= Math.max(statCache.getModificationTime(), newestModTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if there are no logs in the directory then use the modification
|
||||
// time of the directory itself
|
||||
if (newestModTime == 0) {
|
||||
newestModTime = fs.getFileStatus(appDirPath).getModificationTime();
|
||||
}
|
||||
|
||||
return newestModTime;
|
||||
}
|
||||
|
||||
private void addSummaryLog(String attemptDirName,
|
||||
String filename, String owner, boolean isDomainLog) {
|
||||
for (LogInfo log : summaryLogs) {
|
||||
if (log.getFilename().equals(filename)
|
||||
&& log.getAttemptDirName().equals(attemptDirName)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
LOG.debug("Incoming log {} not present in my summaryLogs list, add it",
|
||||
filename);
|
||||
LogInfo log;
|
||||
if (isDomainLog) {
|
||||
log = new DomainLogInfo(attemptDirName, filename, owner);
|
||||
} else {
|
||||
log = new EntityLogInfo(attemptDirName, filename, owner);
|
||||
}
|
||||
summaryLogs.add(log);
|
||||
}
|
||||
|
||||
private void addDetailLog(String attemptDirName, String filename,
|
||||
String owner) {
|
||||
for (LogInfo log : detailLogs) {
|
||||
if (log.getFilename().equals(filename)
|
||||
&& log.getAttemptDirName().equals(attemptDirName)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
detailLogs.add(new EntityLogInfo(attemptDirName, filename, owner));
|
||||
}
|
||||
|
||||
public synchronized void moveToDone() throws IOException {
|
||||
Path doneAppPath = getDoneAppPath(appId);
|
||||
if (!doneAppPath.equals(appDirPath)) {
|
||||
Path donePathParent = doneAppPath.getParent();
|
||||
if (!fs.exists(donePathParent)) {
|
||||
fs.mkdirs(donePathParent);
|
||||
}
|
||||
LOG.debug("Application {} is done, trying to move to done dir {}",
|
||||
appId, doneAppPath);
|
||||
if (!fs.rename(appDirPath, doneAppPath)) {
|
||||
throw new IOException("Rename " + appDirPath + " to " + doneAppPath
|
||||
+ " failed");
|
||||
} else {
|
||||
LOG.info("Moved {} to {}", appDirPath, doneAppPath);
|
||||
}
|
||||
appDirPath = doneAppPath;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class EntityLogScanner implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.debug("Active scan starting");
|
||||
try {
|
||||
scanActiveLogs();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error scanning active files", e);
|
||||
}
|
||||
LOG.debug("Active scan complete");
|
||||
}
|
||||
}
|
||||
|
||||
private class ActiveLogParser implements Runnable {
|
||||
private AppLogs appLogs;
|
||||
|
||||
public ActiveLogParser(AppLogs logs) {
|
||||
appLogs = logs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
LOG.debug("Begin parsing summary logs. ");
|
||||
appLogs.parseSummaryLogs();
|
||||
if (appLogs.isDone()) {
|
||||
appLogs.moveToDone();
|
||||
appIdLogMap.remove(appLogs.getAppId());
|
||||
}
|
||||
LOG.debug("End parsing summary logs. ");
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error processing logs for " + appLogs.getAppId(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class EntityLogCleaner implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.debug("Cleaner starting");
|
||||
try {
|
||||
cleanLogs(doneRootPath, fs, logRetainMillis);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error cleaning files", e);
|
||||
}
|
||||
LOG.debug("Cleaner finished");
|
||||
}
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@VisibleForTesting
|
||||
void setFs(FileSystem incomingFs) {
|
||||
this.fs = incomingFs;
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@VisibleForTesting
|
||||
void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) {
|
||||
cachedLogs.put(groupId, cacheItem);
|
||||
}
|
||||
|
||||
private List<TimelineStore> getTimelineStoresFromCacheIds(
|
||||
Set<TimelineEntityGroupId> groupIds, String entityType)
|
||||
throws IOException {
|
||||
List<TimelineStore> stores = new LinkedList<TimelineStore>();
|
||||
// For now we just handle one store in a context. We return the first
|
||||
// non-null storage for the group ids.
|
||||
for (TimelineEntityGroupId groupId : groupIds) {
|
||||
TimelineStore storeForId = getCachedStore(groupId);
|
||||
if (storeForId != null) {
|
||||
LOG.debug("Adding {} as a store for the query", storeForId.getName());
|
||||
stores.add(storeForId);
|
||||
}
|
||||
}
|
||||
if (stores.size() == 0) {
|
||||
LOG.debug("Using summary store for {}", entityType);
|
||||
stores.add(this.summaryStore);
|
||||
}
|
||||
return stores;
|
||||
}
|
||||
|
||||
private List<TimelineStore> getTimelineStoresForRead(String entityId,
|
||||
String entityType) throws IOException {
|
||||
Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
|
||||
for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
|
||||
LOG.debug("Trying plugin {} for id {} and type {}",
|
||||
cacheIdPlugin.getClass().getName(), entityId, entityType);
|
||||
Set<TimelineEntityGroupId> idsFromPlugin
|
||||
= cacheIdPlugin.getTimelineEntityGroupId(entityId, entityType);
|
||||
if (idsFromPlugin == null) {
|
||||
LOG.debug("Plugin returned null " + cacheIdPlugin.getClass().getName());
|
||||
} else {
|
||||
LOG.debug("Plugin returned ids: " + idsFromPlugin);
|
||||
}
|
||||
|
||||
if (idsFromPlugin != null) {
|
||||
groupIds.addAll(idsFromPlugin);
|
||||
LOG.debug("plugin {} returns a non-null value on query",
|
||||
cacheIdPlugin.getClass().getName());
|
||||
}
|
||||
}
|
||||
return getTimelineStoresFromCacheIds(groupIds, entityType);
|
||||
}
|
||||
|
||||
private List<TimelineStore> getTimelineStoresForRead(String entityType,
|
||||
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters)
|
||||
throws IOException {
|
||||
Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
|
||||
for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
|
||||
Set<TimelineEntityGroupId> idsFromPlugin =
|
||||
cacheIdPlugin.getTimelineEntityGroupId(entityType, primaryFilter,
|
||||
secondaryFilters);
|
||||
if (idsFromPlugin != null) {
|
||||
LOG.debug("plugin {} returns a non-null value on query {}",
|
||||
cacheIdPlugin.getClass().getName(), idsFromPlugin);
|
||||
groupIds.addAll(idsFromPlugin);
|
||||
}
|
||||
}
|
||||
return getTimelineStoresFromCacheIds(groupIds, entityType);
|
||||
}
|
||||
|
||||
// find a cached timeline store or null if it cannot be located
|
||||
private TimelineStore getCachedStore(TimelineEntityGroupId groupId)
|
||||
throws IOException {
|
||||
EntityCacheItem cacheItem;
|
||||
synchronized (this.cachedLogs) {
|
||||
// Note that the content in the cache log storage may be stale.
|
||||
cacheItem = this.cachedLogs.get(groupId);
|
||||
if (cacheItem == null) {
|
||||
LOG.debug("Set up new cache item for id {}", groupId);
|
||||
cacheItem = new EntityCacheItem(getConfig(), fs);
|
||||
AppLogs appLogs = getAndSetAppLogs(groupId.getApplicationId());
|
||||
if (appLogs != null) {
|
||||
LOG.debug("Set applogs {} for group id {}", appLogs, groupId);
|
||||
cacheItem.setAppLogs(appLogs);
|
||||
this.cachedLogs.put(groupId, cacheItem);
|
||||
} else {
|
||||
LOG.warn("AppLogs for groupId {} is set to null!", groupId);
|
||||
}
|
||||
}
|
||||
}
|
||||
TimelineStore store = null;
|
||||
if (cacheItem.getAppLogs() != null) {
|
||||
AppLogs appLogs = cacheItem.getAppLogs();
|
||||
LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId());
|
||||
store = cacheItem.refreshCache(groupId, aclManager, jsonFactory,
|
||||
objMapper);
|
||||
} else {
|
||||
LOG.warn("AppLogs for group id {} is null", groupId);
|
||||
}
|
||||
return store;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelineEntities getEntities(String entityType, Long limit,
|
||||
Long windowStart, Long windowEnd, String fromId, Long fromTs,
|
||||
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
|
||||
EnumSet<Field> fieldsToRetrieve, CheckAcl checkAcl) throws IOException {
|
||||
LOG.debug("getEntities type={} primary={}", entityType, primaryFilter);
|
||||
List<TimelineStore> stores = getTimelineStoresForRead(entityType,
|
||||
primaryFilter, secondaryFilters);
|
||||
TimelineEntities returnEntities = new TimelineEntities();
|
||||
for (TimelineStore store : stores) {
|
||||
LOG.debug("Try timeline store {} for the request", store.getName());
|
||||
returnEntities.addEntities(
|
||||
store.getEntities(entityType, limit, windowStart, windowEnd, fromId,
|
||||
fromTs, primaryFilter, secondaryFilters, fieldsToRetrieve,
|
||||
checkAcl).getEntities());
|
||||
}
|
||||
return returnEntities;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelineEntity getEntity(String entityId, String entityType,
|
||||
EnumSet<Field> fieldsToRetrieve) throws IOException {
|
||||
LOG.debug("getEntity type={} id={}", entityType, entityId);
|
||||
List<TimelineStore> stores = getTimelineStoresForRead(entityId, entityType);
|
||||
for (TimelineStore store : stores) {
|
||||
LOG.debug("Try timeline store {}:{} for the request", store.getName(),
|
||||
store.toString());
|
||||
TimelineEntity e =
|
||||
store.getEntity(entityId, entityType, fieldsToRetrieve);
|
||||
if (e != null) {
|
||||
return e;
|
||||
}
|
||||
}
|
||||
LOG.debug("getEntity: Found nothing");
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelineEvents getEntityTimelines(String entityType,
|
||||
SortedSet<String> entityIds, Long limit, Long windowStart,
|
||||
Long windowEnd, Set<String> eventTypes) throws IOException {
|
||||
LOG.debug("getEntityTimelines type={} ids={}", entityType, entityIds);
|
||||
TimelineEvents returnEvents = new TimelineEvents();
|
||||
for (String entityId : entityIds) {
|
||||
LOG.debug("getEntityTimeline type={} id={}", entityType, entityId);
|
||||
List<TimelineStore> stores
|
||||
= getTimelineStoresForRead(entityId, entityType);
|
||||
for (TimelineStore store : stores) {
|
||||
LOG.debug("Try timeline store {}:{} for the request", store.getName(),
|
||||
store.toString());
|
||||
SortedSet<String> entityIdSet = new TreeSet<>();
|
||||
entityIdSet.add(entityId);
|
||||
TimelineEvents events =
|
||||
store.getEntityTimelines(entityType, entityIdSet, limit,
|
||||
windowStart, windowEnd, eventTypes);
|
||||
returnEvents.addEvents(events.getAllEvents());
|
||||
}
|
||||
}
|
||||
return returnEvents;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelineDomain getDomain(String domainId) throws IOException {
|
||||
return summaryStore.getDomain(domainId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelineDomains getDomains(String owner) throws IOException {
|
||||
return summaryStore.getDomains(owner);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelinePutResponse put(TimelineEntities data) throws IOException {
|
||||
return summaryStore.put(data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(TimelineDomain domain) throws IOException {
|
||||
summaryStore.put(domain);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,281 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.timeline;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.codehaus.jackson.JsonFactory;
|
||||
import org.codehaus.jackson.JsonParseException;
|
||||
import org.codehaus.jackson.JsonParser;
|
||||
import org.codehaus.jackson.map.MappingIterator;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
||||
abstract class LogInfo {
|
||||
public static final String ENTITY_FILE_NAME_DELIMITERS = "_.";
|
||||
|
||||
public String getAttemptDirName() {
|
||||
return attemptDirName;
|
||||
}
|
||||
|
||||
public long getOffset() {
|
||||
return offset;
|
||||
}
|
||||
|
||||
public void setOffset(long newOffset) {
|
||||
this.offset = newOffset;
|
||||
}
|
||||
|
||||
private String attemptDirName;
|
||||
private String filename;
|
||||
private String user;
|
||||
private long offset = 0;
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(LogInfo.class);
|
||||
|
||||
public LogInfo(String attemptDirName, String file, String owner) {
|
||||
this.attemptDirName = attemptDirName;
|
||||
filename = file;
|
||||
user = owner;
|
||||
}
|
||||
|
||||
public Path getPath(Path rootPath) {
|
||||
Path attemptPath = new Path(rootPath, attemptDirName);
|
||||
return new Path(attemptPath, filename);
|
||||
}
|
||||
|
||||
public String getFilename() {
|
||||
return filename;
|
||||
}
|
||||
|
||||
public boolean matchesGroupId(TimelineEntityGroupId groupId) {
|
||||
return matchesGroupId(groupId.toString());
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@VisibleForTesting
|
||||
boolean matchesGroupId(String groupId){
|
||||
// Return true if the group id is a segment (separated by _, ., or end of
|
||||
// string) of the file name.
|
||||
int pos = filename.indexOf(groupId);
|
||||
if (pos < 0) {
|
||||
return false;
|
||||
}
|
||||
return filename.length() == pos + groupId.length()
|
||||
|| ENTITY_FILE_NAME_DELIMITERS.contains(String.valueOf(
|
||||
filename.charAt(pos + groupId.length())
|
||||
));
|
||||
}
|
||||
|
||||
public void parseForStore(TimelineDataManager tdm, Path appDirPath,
|
||||
boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper,
|
||||
FileSystem fs) throws IOException {
|
||||
LOG.debug("Parsing for log dir {} on attempt {}", appDirPath,
|
||||
attemptDirName);
|
||||
Path logPath = getPath(appDirPath);
|
||||
if (fs.exists(logPath)) {
|
||||
long startTime = Time.monotonicNow();
|
||||
try {
|
||||
LOG.debug("Parsing {} at offset {}", logPath, offset);
|
||||
long count = parsePath(tdm, logPath, appCompleted, jsonFactory,
|
||||
objMapper, fs);
|
||||
LOG.info("Parsed {} entities from {} in {} msec",
|
||||
count, logPath, Time.monotonicNow() - startTime);
|
||||
} catch (RuntimeException e) {
|
||||
if (e.getCause() instanceof JsonParseException) {
|
||||
// If AppLogs cannot parse this log, it may be corrupted
|
||||
LOG.info("Log {} appears to be corrupted. Skip. ", logPath);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG.warn("{} no longer exists. Skip for scanning. ", logPath);
|
||||
}
|
||||
}
|
||||
|
||||
private long parsePath(TimelineDataManager tdm, Path logPath,
|
||||
boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper,
|
||||
FileSystem fs) throws IOException {
|
||||
UserGroupInformation ugi =
|
||||
UserGroupInformation.createRemoteUser(user);
|
||||
FSDataInputStream in = fs.open(logPath);
|
||||
JsonParser parser = null;
|
||||
try {
|
||||
in.seek(offset);
|
||||
try {
|
||||
parser = jsonFactory.createJsonParser(in);
|
||||
parser.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
|
||||
} catch (IOException e) {
|
||||
// if app hasn't completed then there may be errors due to the
|
||||
// incomplete file which are treated as EOF until app completes
|
||||
if (appCompleted) {
|
||||
throw e;
|
||||
} else {
|
||||
LOG.debug("Exception in parse path: {}", e.getMessage());
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
return doParse(tdm, parser, objMapper, ugi, appCompleted);
|
||||
} finally {
|
||||
IOUtils.closeStream(parser);
|
||||
IOUtils.closeStream(in);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract long doParse(TimelineDataManager tdm, JsonParser parser,
|
||||
ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
|
||||
throws IOException;
|
||||
}
|
||||
|
||||
class EntityLogInfo extends LogInfo {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
EntityGroupFSTimelineStore.class);
|
||||
|
||||
public EntityLogInfo(String attemptId,
|
||||
String file, String owner) {
|
||||
super(attemptId, file, owner);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long doParse(TimelineDataManager tdm, JsonParser parser,
|
||||
ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
|
||||
throws IOException {
|
||||
long count = 0;
|
||||
TimelineEntities entities = new TimelineEntities();
|
||||
ArrayList<TimelineEntity> entityList = new ArrayList<TimelineEntity>(1);
|
||||
long bytesParsed;
|
||||
long bytesParsedLastBatch = 0;
|
||||
boolean postError = false;
|
||||
try {
|
||||
MappingIterator<TimelineEntity> iter = objMapper.readValues(parser,
|
||||
TimelineEntity.class);
|
||||
|
||||
while (iter.hasNext()) {
|
||||
TimelineEntity entity = iter.next();
|
||||
String etype = entity.getEntityType();
|
||||
String eid = entity.getEntityId();
|
||||
LOG.trace("Read entity {}", etype);
|
||||
++count;
|
||||
bytesParsed = parser.getCurrentLocation().getCharOffset() + 1;
|
||||
LOG.trace("Parser now at offset {}", bytesParsed);
|
||||
|
||||
try {
|
||||
LOG.debug("Adding {}({}) to store", eid, etype);
|
||||
entityList.add(entity);
|
||||
entities.setEntities(entityList);
|
||||
TimelinePutResponse response = tdm.postEntities(entities, ugi);
|
||||
for (TimelinePutResponse.TimelinePutError e
|
||||
: response.getErrors()) {
|
||||
LOG.warn("Error putting entity: {} ({}): {}",
|
||||
e.getEntityId(), e.getEntityType(), e.getErrorCode());
|
||||
}
|
||||
setOffset(getOffset() + bytesParsed - bytesParsedLastBatch);
|
||||
bytesParsedLastBatch = bytesParsed;
|
||||
entityList.clear();
|
||||
} catch (YarnException e) {
|
||||
postError = true;
|
||||
throw new IOException("Error posting entities", e);
|
||||
} catch (IOException e) {
|
||||
postError = true;
|
||||
throw new IOException("Error posting entities", e);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// if app hasn't completed then there may be errors due to the
|
||||
// incomplete file which are treated as EOF until app completes
|
||||
if (appCompleted || postError) {
|
||||
throw e;
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
if (appCompleted || !(e.getCause() instanceof JsonParseException)) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
}
|
||||
|
||||
class DomainLogInfo extends LogInfo {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
EntityGroupFSTimelineStore.class);
|
||||
|
||||
public DomainLogInfo(String attemptDirName, String file,
|
||||
String owner) {
|
||||
super(attemptDirName, file, owner);
|
||||
}
|
||||
|
||||
protected long doParse(TimelineDataManager tdm, JsonParser parser,
|
||||
ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
|
||||
throws IOException {
|
||||
long count = 0;
|
||||
long bytesParsed;
|
||||
long bytesParsedLastBatch = 0;
|
||||
boolean putError = false;
|
||||
try {
|
||||
MappingIterator<TimelineDomain> iter = objMapper.readValues(parser,
|
||||
TimelineDomain.class);
|
||||
|
||||
while (iter.hasNext()) {
|
||||
TimelineDomain domain = iter.next();
|
||||
domain.setOwner(ugi.getShortUserName());
|
||||
LOG.trace("Read domain {}", domain.getId());
|
||||
++count;
|
||||
bytesParsed = parser.getCurrentLocation().getCharOffset() + 1;
|
||||
LOG.trace("Parser now at offset {}", bytesParsed);
|
||||
|
||||
try {
|
||||
tdm.putDomain(domain, ugi);
|
||||
setOffset(getOffset() + bytesParsed - bytesParsedLastBatch);
|
||||
bytesParsedLastBatch = bytesParsed;
|
||||
} catch (YarnException e) {
|
||||
putError = true;
|
||||
throw new IOException("Error posting domain", e);
|
||||
} catch (IOException e) {
|
||||
putError = true;
|
||||
throw new IOException("Error posting domain", e);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// if app hasn't completed then there may be errors due to the
|
||||
// incomplete file which are treated as EOF until app completes
|
||||
if (appCompleted || putError) {
|
||||
throw e;
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
if (appCompleted || !(e.getCause() instanceof JsonParseException)) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timeline;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
|
||||
/**
|
||||
* Plugin to map a requested query ( or an Entity/set of Entities ) to a CacheID.
|
||||
* The Cache ID is an identifier to the data set that needs to be queried to
|
||||
* serve the response for the query.
|
||||
*/
|
||||
public abstract class TimelineEntityGroupPlugin {
|
||||
|
||||
/**
|
||||
* Get the {@link TimelineEntityGroupId}s for the data sets that need to be
|
||||
* scanned to serve the query.
|
||||
*
|
||||
* @param entityType Entity Type being queried
|
||||
* @param primaryFilter Primary filter being applied
|
||||
* @param secondaryFilters Secondary filters being applied in the query
|
||||
* @return {@link org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId}
|
||||
*/
|
||||
public abstract Set<TimelineEntityGroupId> getTimelineEntityGroupId(
|
||||
String entityType, NameValuePair primaryFilter,
|
||||
Collection<NameValuePair> secondaryFilters);
|
||||
|
||||
/**
|
||||
* Get the {@link TimelineEntityGroupId}s for the data sets that need to be
|
||||
* scanned to serve the query.
|
||||
*
|
||||
* @param entityType Entity Type being queried
|
||||
* @param entityId Entity Id being requested
|
||||
* @return {@link org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId}
|
||||
*/
|
||||
public abstract Set<TimelineEntityGroupId> getTimelineEntityGroupId(
|
||||
String entityId,
|
||||
String entityType);
|
||||
|
||||
|
||||
/**
|
||||
* Get the {@link TimelineEntityGroupId}s for the data sets that need to be
|
||||
* scanned to serve the query.
|
||||
*
|
||||
* @param entityType Entity Type being queried
|
||||
* @param entityIds Entity Ids being requested
|
||||
* @param eventTypes Event Types being requested
|
||||
* @return {@link org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId}
|
||||
*/
|
||||
public abstract Set<TimelineEntityGroupId> getTimelineEntityGroupId(
|
||||
String entityType, SortedSet<String> entityIds,
|
||||
Set<String> eventTypes);
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
package org.apache.hadoop.yarn.server.timeline;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -0,0 +1,56 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.timeline;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
|
||||
class EntityGroupPlugInForTest extends TimelineEntityGroupPlugin {
|
||||
|
||||
private static TimelineEntityGroupId timelineEntityGroupId
|
||||
= TimelineEntityGroupId.newInstance(
|
||||
TestEntityGroupFSTimelineStore.TEST_APPLICATION_ID, "test");
|
||||
|
||||
@Override
|
||||
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
|
||||
NameValuePair primaryFilter,
|
||||
Collection<NameValuePair> secondaryFilters) {
|
||||
return Sets.newHashSet(timelineEntityGroupId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityId,
|
||||
String entityType) {
|
||||
return Sets.newHashSet(timelineEntityGroupId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
|
||||
SortedSet<String> entityIds,
|
||||
Set<String> eventTypes) {
|
||||
return Sets.newHashSet(timelineEntityGroupId);
|
||||
}
|
||||
|
||||
static TimelineEntityGroupId getStandardTimelineGroupId() {
|
||||
return timelineEntityGroupId;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,208 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.timeline;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
||||
import org.codehaus.jackson.JsonFactory;
|
||||
import org.codehaus.jackson.JsonGenerator;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.map.SerializationConfig;
|
||||
import org.codehaus.jackson.map.annotate.JsonSerialize;
|
||||
import org.codehaus.jackson.util.MinimalPrettyPrinter;
|
||||
import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
class PluginStoreTestUtils {
|
||||
|
||||
static FSDataOutputStream createLogFile(Path logPath, FileSystem fs)
|
||||
throws IOException {
|
||||
FSDataOutputStream stream;
|
||||
stream = fs.create(logPath, true);
|
||||
return stream;
|
||||
}
|
||||
|
||||
static ObjectMapper createObjectMapper() {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector());
|
||||
mapper.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
|
||||
mapper.configure(SerializationConfig.Feature.CLOSE_CLOSEABLE, false);
|
||||
return mapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create sample entities for testing
|
||||
* @return two timeline entities in a {@link TimelineEntities} object
|
||||
*/
|
||||
static TimelineEntities generateTestEntities() {
|
||||
TimelineEntities entities = new TimelineEntities();
|
||||
Map<String, Set<Object>> primaryFilters =
|
||||
new HashMap<String, Set<Object>>();
|
||||
Set<Object> l1 = new HashSet<Object>();
|
||||
l1.add("username");
|
||||
Set<Object> l2 = new HashSet<Object>();
|
||||
l2.add(Integer.MAX_VALUE);
|
||||
Set<Object> l3 = new HashSet<Object>();
|
||||
l3.add("123abc");
|
||||
Set<Object> l4 = new HashSet<Object>();
|
||||
l4.add((long)Integer.MAX_VALUE + 1l);
|
||||
primaryFilters.put("user", l1);
|
||||
primaryFilters.put("appname", l2);
|
||||
primaryFilters.put("other", l3);
|
||||
primaryFilters.put("long", l4);
|
||||
Map<String, Object> secondaryFilters = new HashMap<String, Object>();
|
||||
secondaryFilters.put("startTime", 123456);
|
||||
secondaryFilters.put("status", "RUNNING");
|
||||
Map<String, Object> otherInfo1 = new HashMap<String, Object>();
|
||||
otherInfo1.put("info1", "val1");
|
||||
otherInfo1.putAll(secondaryFilters);
|
||||
|
||||
String entityId1 = "id_1";
|
||||
String entityType1 = "type_1";
|
||||
String entityId2 = "id_2";
|
||||
String entityType2 = "type_2";
|
||||
|
||||
Map<String, Set<String>> relatedEntities =
|
||||
new HashMap<String, Set<String>>();
|
||||
relatedEntities.put(entityType2, Collections.singleton(entityId2));
|
||||
|
||||
TimelineEvent ev3 = createEvent(789l, "launch_event", null);
|
||||
TimelineEvent ev4 = createEvent(0l, "init_event", null);
|
||||
List<TimelineEvent> events = new ArrayList<TimelineEvent>();
|
||||
events.add(ev3);
|
||||
events.add(ev4);
|
||||
entities.addEntity(createEntity(entityId2, entityType2, 456l, events, null,
|
||||
null, null, "domain_id_1"));
|
||||
|
||||
TimelineEvent ev1 = createEvent(123l, "start_event", null);
|
||||
entities.addEntity(createEntity(entityId1, entityType1, 123l,
|
||||
Collections.singletonList(ev1), relatedEntities, primaryFilters,
|
||||
otherInfo1, "domain_id_1"));
|
||||
return entities;
|
||||
}
|
||||
|
||||
static void verifyTestEntities(TimelineDataManager tdm)
|
||||
throws YarnException, IOException {
|
||||
TimelineEntity entity1 = tdm.getEntity("type_1", "id_1",
|
||||
EnumSet.allOf(TimelineReader.Field.class),
|
||||
UserGroupInformation.getLoginUser());
|
||||
TimelineEntity entity2 = tdm.getEntity("type_2", "id_2",
|
||||
EnumSet.allOf(TimelineReader.Field.class),
|
||||
UserGroupInformation.getLoginUser());
|
||||
assertNotNull(entity1);
|
||||
assertNotNull(entity2);
|
||||
assertEquals("Failed to read out entity 1",
|
||||
(Long) 123l, entity1.getStartTime());
|
||||
assertEquals("Failed to read out entity 2",
|
||||
(Long) 456l, entity2.getStartTime());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a test entity
|
||||
*/
|
||||
static TimelineEntity createEntity(String entityId, String entityType,
|
||||
Long startTime, List<TimelineEvent> events,
|
||||
Map<String, Set<String>> relatedEntities,
|
||||
Map<String, Set<Object>> primaryFilters,
|
||||
Map<String, Object> otherInfo, String domainId) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
entity.setEntityId(entityId);
|
||||
entity.setEntityType(entityType);
|
||||
entity.setStartTime(startTime);
|
||||
entity.setEvents(events);
|
||||
if (relatedEntities != null) {
|
||||
for (Map.Entry<String, Set<String>> e : relatedEntities.entrySet()) {
|
||||
for (String v : e.getValue()) {
|
||||
entity.addRelatedEntity(e.getKey(), v);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
entity.setRelatedEntities(null);
|
||||
}
|
||||
entity.setPrimaryFilters(primaryFilters);
|
||||
entity.setOtherInfo(otherInfo);
|
||||
entity.setDomainId(domainId);
|
||||
return entity;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a test event
|
||||
*/
|
||||
static TimelineEvent createEvent(long timestamp, String type, Map<String,
|
||||
Object> info) {
|
||||
TimelineEvent event = new TimelineEvent();
|
||||
event.setTimestamp(timestamp);
|
||||
event.setEventType(type);
|
||||
event.setEventInfo(info);
|
||||
return event;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write timeline entities to a file system
|
||||
* @param entities
|
||||
* @param logPath
|
||||
* @param fs
|
||||
* @throws IOException
|
||||
*/
|
||||
static void writeEntities(TimelineEntities entities, Path logPath,
|
||||
FileSystem fs) throws IOException {
|
||||
FSDataOutputStream outStream = createLogFile(logPath, fs);
|
||||
JsonGenerator jsonGenerator
|
||||
= (new JsonFactory()).createJsonGenerator(outStream);
|
||||
jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
|
||||
ObjectMapper objMapper = createObjectMapper();
|
||||
for (TimelineEntity entity : entities.getEntities()) {
|
||||
objMapper.writeValue(jsonGenerator, entity);
|
||||
}
|
||||
outStream.close();
|
||||
}
|
||||
|
||||
static TimelineDataManager getTdmWithStore(Configuration config, TimelineStore store) {
|
||||
TimelineACLsManager aclManager = new TimelineACLsManager(config);
|
||||
TimelineDataManager tdm = new TimelineDataManager(store, aclManager);
|
||||
tdm.init(config);
|
||||
return tdm;
|
||||
}
|
||||
|
||||
static TimelineDataManager getTdmWithMemStore(Configuration config) {
|
||||
TimelineStore store = new MemoryTimelineStore();
|
||||
TimelineDataManager tdm = getTdmWithStore(config, store);
|
||||
return tdm;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,332 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timeline;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
||||
|
||||
private static final String SAMPLE_APP_NAME = "1234_5678";
|
||||
|
||||
static final ApplicationId TEST_APPLICATION_ID
|
||||
= ConverterUtils.toApplicationId(
|
||||
ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_NAME);
|
||||
|
||||
private static final String TEST_APP_DIR_NAME
|
||||
= TEST_APPLICATION_ID.toString();
|
||||
private static final String TEST_ATTEMPT_DIR_NAME
|
||||
= ApplicationAttemptId.appAttemptIdStrPrefix + SAMPLE_APP_NAME + "_1";
|
||||
private static final String TEST_SUMMARY_LOG_FILE_NAME
|
||||
= EntityGroupFSTimelineStore.SUMMARY_LOG_PREFIX + "test";
|
||||
private static final String TEST_ENTITY_LOG_FILE_NAME
|
||||
= EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
|
||||
+ EntityGroupPlugInForTest.getStandardTimelineGroupId();
|
||||
private static final String TEST_DOMAIN_LOG_FILE_NAME
|
||||
= EntityGroupFSTimelineStore.DOMAIN_LOG_PREFIX + "test";
|
||||
|
||||
private static final Path TEST_ROOT_DIR
|
||||
= new Path(System.getProperty("test.build.data",
|
||||
System.getProperty("java.io.tmpdir")),
|
||||
TestEntityGroupFSTimelineStore.class.getSimpleName());
|
||||
private static final Path TEST_APP_DIR_PATH
|
||||
= new Path(TEST_ROOT_DIR, TEST_APP_DIR_NAME);
|
||||
private static final Path TEST_ATTEMPT_DIR_PATH
|
||||
= new Path(TEST_APP_DIR_PATH, TEST_ATTEMPT_DIR_NAME);
|
||||
private static final Path TEST_DONE_DIR_PATH
|
||||
= new Path(TEST_ROOT_DIR, "done");
|
||||
|
||||
private static Configuration config = new YarnConfiguration();
|
||||
private static MiniDFSCluster hdfsCluster;
|
||||
private static FileSystem fs;
|
||||
private EntityGroupFSTimelineStore store;
|
||||
private TimelineEntity entityNew;
|
||||
|
||||
@Rule
|
||||
public TestName currTestName = new TestName();
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClass() throws Exception {
|
||||
config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
|
||||
config.set(
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
|
||||
"YARN_APPLICATION,YARN_APPLICATION_ATTEMPT,YARN_CONTAINER");
|
||||
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
|
||||
TEST_DONE_DIR_PATH.toString());
|
||||
config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString());
|
||||
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
|
||||
hdfsCluster
|
||||
= new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build();
|
||||
fs = hdfsCluster.getFileSystem();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
createTestFiles();
|
||||
store = new EntityGroupFSTimelineStore();
|
||||
if (currTestName.getMethodName().contains("Plugin")) {
|
||||
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
|
||||
EntityGroupPlugInForTest.class.getName());
|
||||
}
|
||||
store.init(config);
|
||||
store.start();
|
||||
store.setFs(fs);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
fs.delete(TEST_APP_DIR_PATH, true);
|
||||
store.stop();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownClass() throws Exception {
|
||||
hdfsCluster.shutdown();
|
||||
FileContext fileContext = FileContext.getLocalFSFileContext();
|
||||
fileContext.delete(new Path(
|
||||
config.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)), true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppLogsScanLogs() throws Exception {
|
||||
EntityGroupFSTimelineStore.AppLogs appLogs =
|
||||
store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
|
||||
AppState.COMPLETED);
|
||||
appLogs.scanForLogs();
|
||||
List<LogInfo> summaryLogs = appLogs.getSummaryLogs();
|
||||
List<LogInfo> detailLogs = appLogs.getDetailLogs();
|
||||
assertEquals(2, summaryLogs.size());
|
||||
assertEquals(1, detailLogs.size());
|
||||
|
||||
for (LogInfo log : summaryLogs) {
|
||||
String fileName = log.getFilename();
|
||||
assertTrue(fileName.equals(TEST_SUMMARY_LOG_FILE_NAME)
|
||||
|| fileName.equals(TEST_DOMAIN_LOG_FILE_NAME));
|
||||
}
|
||||
|
||||
for (LogInfo log : detailLogs) {
|
||||
String fileName = log.getFilename();
|
||||
assertEquals(fileName, TEST_ENTITY_LOG_FILE_NAME);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveToDone() throws Exception {
|
||||
EntityGroupFSTimelineStore.AppLogs appLogs =
|
||||
store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
|
||||
AppState.COMPLETED);
|
||||
Path pathBefore = appLogs.getAppDirPath();
|
||||
appLogs.moveToDone();
|
||||
Path pathAfter = appLogs.getAppDirPath();
|
||||
assertNotEquals(pathBefore, pathAfter);
|
||||
assertTrue(pathAfter.toString().contains(TEST_DONE_DIR_PATH.toString()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseSummaryLogs() throws Exception {
|
||||
TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
|
||||
EntityGroupFSTimelineStore.AppLogs appLogs =
|
||||
store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
|
||||
AppState.COMPLETED);
|
||||
appLogs.scanForLogs();
|
||||
appLogs.parseSummaryLogs(tdm);
|
||||
PluginStoreTestUtils.verifyTestEntities(tdm);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCleanLogs() throws Exception {
|
||||
// Create test dirs and files
|
||||
// Irrelevant file, should not be reclaimed
|
||||
Path irrelevantFilePath = new Path(
|
||||
TEST_DONE_DIR_PATH, "irrelevant.log");
|
||||
FSDataOutputStream stream = fs.create(irrelevantFilePath);
|
||||
stream.close();
|
||||
// Irrelevant directory, should not be reclaimed
|
||||
Path irrelevantDirPath = new Path(TEST_DONE_DIR_PATH, "irrelevant");
|
||||
fs.mkdirs(irrelevantDirPath);
|
||||
|
||||
Path doneAppHomeDir = new Path(new Path(TEST_DONE_DIR_PATH, "0000"), "001");
|
||||
// First application, untouched after creation
|
||||
Path appDirClean = new Path(doneAppHomeDir, TEST_APP_DIR_NAME);
|
||||
Path attemptDirClean = new Path(appDirClean, TEST_ATTEMPT_DIR_NAME);
|
||||
fs.mkdirs(attemptDirClean);
|
||||
Path filePath = new Path(attemptDirClean, "test.log");
|
||||
stream = fs.create(filePath);
|
||||
stream.close();
|
||||
// Second application, one file touched after creation
|
||||
Path appDirHoldByFile = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "1");
|
||||
Path attemptDirHoldByFile
|
||||
= new Path(appDirHoldByFile, TEST_ATTEMPT_DIR_NAME);
|
||||
fs.mkdirs(attemptDirHoldByFile);
|
||||
Path filePathHold = new Path(attemptDirHoldByFile, "test1.log");
|
||||
stream = fs.create(filePathHold);
|
||||
stream.close();
|
||||
// Third application, one dir touched after creation
|
||||
Path appDirHoldByDir = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "2");
|
||||
Path attemptDirHoldByDir = new Path(appDirHoldByDir, TEST_ATTEMPT_DIR_NAME);
|
||||
fs.mkdirs(attemptDirHoldByDir);
|
||||
Path dirPathHold = new Path(attemptDirHoldByDir, "hold");
|
||||
fs.mkdirs(dirPathHold);
|
||||
// Fourth application, empty dirs
|
||||
Path appDirEmpty = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "3");
|
||||
Path attemptDirEmpty = new Path(appDirEmpty, TEST_ATTEMPT_DIR_NAME);
|
||||
fs.mkdirs(attemptDirEmpty);
|
||||
Path dirPathEmpty = new Path(attemptDirEmpty, "empty");
|
||||
fs.mkdirs(dirPathEmpty);
|
||||
|
||||
// Should retain all logs after this run
|
||||
EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 10000);
|
||||
assertTrue(fs.exists(irrelevantDirPath));
|
||||
assertTrue(fs.exists(irrelevantFilePath));
|
||||
assertTrue(fs.exists(filePath));
|
||||
assertTrue(fs.exists(filePathHold));
|
||||
assertTrue(fs.exists(dirPathHold));
|
||||
assertTrue(fs.exists(dirPathEmpty));
|
||||
|
||||
// Make sure the created dir is old enough
|
||||
Thread.sleep(2000);
|
||||
// Touch the second application
|
||||
stream = fs.append(filePathHold);
|
||||
stream.writeBytes("append");
|
||||
stream.close();
|
||||
// Touch the third application by creating a new dir
|
||||
fs.mkdirs(new Path(dirPathHold, "holdByMe"));
|
||||
|
||||
EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 1000);
|
||||
|
||||
// Verification after the second cleaner call
|
||||
assertTrue(fs.exists(irrelevantDirPath));
|
||||
assertTrue(fs.exists(irrelevantFilePath));
|
||||
assertTrue(fs.exists(filePathHold));
|
||||
assertTrue(fs.exists(dirPathHold));
|
||||
assertTrue(fs.exists(doneAppHomeDir));
|
||||
|
||||
// appDirClean and appDirEmpty should be cleaned up
|
||||
assertFalse(fs.exists(appDirClean));
|
||||
assertFalse(fs.exists(appDirEmpty));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPluginRead() throws Exception {
|
||||
// Verify precondition
|
||||
assertEquals(EntityGroupPlugInForTest.class.getName(),
|
||||
store.getConfig().get(
|
||||
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
|
||||
// Load data and cache item, prepare timeline store by making a cache item
|
||||
EntityGroupFSTimelineStore.AppLogs appLogs =
|
||||
store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
|
||||
AppState.COMPLETED);
|
||||
EntityCacheItem cacheItem = new EntityCacheItem(config, fs);
|
||||
cacheItem.setAppLogs(appLogs);
|
||||
store.setCachedLogs(
|
||||
EntityGroupPlugInForTest.getStandardTimelineGroupId(), cacheItem);
|
||||
// Generate TDM
|
||||
TimelineDataManager tdm
|
||||
= PluginStoreTestUtils.getTdmWithStore(config, store);
|
||||
|
||||
// Verify single entity read
|
||||
TimelineEntity entity3 = tdm.getEntity("type_3", "id_3",
|
||||
EnumSet.allOf(TimelineReader.Field.class),
|
||||
UserGroupInformation.getLoginUser());
|
||||
assertNotNull(entity3);
|
||||
assertEquals(entityNew.getStartTime(), entity3.getStartTime());
|
||||
// Verify multiple entities read
|
||||
TimelineEntities entities = tdm.getEntities("type_3", null, null, null,
|
||||
null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
|
||||
UserGroupInformation.getLoginUser());
|
||||
assertEquals(entities.getEntities().size(), 1);
|
||||
for (TimelineEntity entity : entities.getEntities()) {
|
||||
assertEquals(entityNew.getStartTime(), entity.getStartTime());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSummaryRead() throws Exception {
|
||||
// Load data
|
||||
EntityGroupFSTimelineStore.AppLogs appLogs =
|
||||
store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
|
||||
AppState.COMPLETED);
|
||||
TimelineDataManager tdm
|
||||
= PluginStoreTestUtils.getTdmWithStore(config, store);
|
||||
appLogs.scanForLogs();
|
||||
appLogs.parseSummaryLogs(tdm);
|
||||
|
||||
// Verify single entity read
|
||||
PluginStoreTestUtils.verifyTestEntities(tdm);
|
||||
// Verify multiple entities read
|
||||
TimelineEntities entities = tdm.getEntities("type_1", null, null, null,
|
||||
null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
|
||||
UserGroupInformation.getLoginUser());
|
||||
assertEquals(entities.getEntities().size(), 1);
|
||||
for (TimelineEntity entity : entities.getEntities()) {
|
||||
assertEquals((Long) 123l, entity.getStartTime());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void createTestFiles() throws IOException {
|
||||
TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
|
||||
PluginStoreTestUtils.writeEntities(entities,
|
||||
new Path(TEST_ATTEMPT_DIR_PATH, TEST_SUMMARY_LOG_FILE_NAME), fs);
|
||||
|
||||
entityNew = PluginStoreTestUtils
|
||||
.createEntity("id_3", "type_3", 789l, null, null,
|
||||
null, null, "domain_id_1");
|
||||
TimelineEntities entityList = new TimelineEntities();
|
||||
entityList.addEntity(entityNew);
|
||||
PluginStoreTestUtils.writeEntities(entityList,
|
||||
new Path(TEST_ATTEMPT_DIR_PATH, TEST_ENTITY_LOG_FILE_NAME), fs);
|
||||
|
||||
FSDataOutputStream out = fs.create(
|
||||
new Path(TEST_ATTEMPT_DIR_PATH, TEST_DOMAIN_LOG_FILE_NAME));
|
||||
out.close();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,253 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.timeline;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.codehaus.jackson.JsonFactory;
|
||||
import org.codehaus.jackson.JsonGenerator;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.util.MinimalPrettyPrinter;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestLogInfo {
|
||||
|
||||
private static final Path TEST_ROOT_DIR = new Path(
|
||||
System.getProperty("test.build.data",
|
||||
System.getProperty("java.io.tmpdir")),
|
||||
TestLogInfo.class.getSimpleName());
|
||||
|
||||
private static final String TEST_ATTEMPT_DIR_NAME = "test_app";
|
||||
private static final String TEST_ENTITY_FILE_NAME = "test_entity";
|
||||
private static final String TEST_DOMAIN_FILE_NAME = "test_domain";
|
||||
private static final String TEST_BROKEN_FILE_NAME = "test_broken";
|
||||
|
||||
private Configuration config = new YarnConfiguration();
|
||||
private MiniDFSCluster hdfsCluster;
|
||||
private FileSystem fs;
|
||||
private ObjectMapper objMapper;
|
||||
|
||||
private JsonFactory jsonFactory = new JsonFactory();
|
||||
private JsonGenerator jsonGenerator;
|
||||
private FSDataOutputStream outStream = null;
|
||||
private FSDataOutputStream outStreamDomain = null;
|
||||
|
||||
private TimelineDomain testDomain;
|
||||
|
||||
private static final short FILE_LOG_DIR_PERMISSIONS = 0770;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString());
|
||||
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
|
||||
hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build();
|
||||
fs = hdfsCluster.getFileSystem();
|
||||
Path testAppDirPath = new Path(TEST_ROOT_DIR, TEST_ATTEMPT_DIR_NAME);
|
||||
fs.mkdirs(testAppDirPath, new FsPermission(FILE_LOG_DIR_PERMISSIONS));
|
||||
objMapper = PluginStoreTestUtils.createObjectMapper();
|
||||
|
||||
TimelineEntities testEntities = PluginStoreTestUtils.generateTestEntities();
|
||||
writeEntitiesLeaveOpen(testEntities,
|
||||
new Path(testAppDirPath, TEST_ENTITY_FILE_NAME));
|
||||
|
||||
testDomain = new TimelineDomain();
|
||||
testDomain.setId("domain_1");
|
||||
testDomain.setReaders(UserGroupInformation.getLoginUser().getUserName());
|
||||
testDomain.setOwner(UserGroupInformation.getLoginUser().getUserName());
|
||||
testDomain.setDescription("description");
|
||||
writeDomainLeaveOpen(testDomain,
|
||||
new Path(testAppDirPath, TEST_DOMAIN_FILE_NAME));
|
||||
|
||||
writeBrokenFile(new Path(testAppDirPath, TEST_BROKEN_FILE_NAME));
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
jsonGenerator.close();
|
||||
outStream.close();
|
||||
outStreamDomain.close();
|
||||
hdfsCluster.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMatchesGroupId() throws Exception {
|
||||
String testGroupId = "app1_group1";
|
||||
// Match
|
||||
EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
|
||||
"app1_group1",
|
||||
UserGroupInformation.getLoginUser().getUserName());
|
||||
assertTrue(testLogInfo.matchesGroupId(testGroupId));
|
||||
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
|
||||
"test_app1_group1",
|
||||
UserGroupInformation.getLoginUser().getUserName());
|
||||
assertTrue(testLogInfo.matchesGroupId(testGroupId));
|
||||
// Unmatch
|
||||
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app2_group1",
|
||||
UserGroupInformation.getLoginUser().getUserName());
|
||||
assertFalse(testLogInfo.matchesGroupId(testGroupId));
|
||||
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group2",
|
||||
UserGroupInformation.getLoginUser().getUserName());
|
||||
assertFalse(testLogInfo.matchesGroupId(testGroupId));
|
||||
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group12",
|
||||
UserGroupInformation.getLoginUser().getUserName());
|
||||
assertFalse(testLogInfo.matchesGroupId(testGroupId));
|
||||
// Check delimiters
|
||||
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group1_2",
|
||||
UserGroupInformation.getLoginUser().getUserName());
|
||||
assertTrue(testLogInfo.matchesGroupId(testGroupId));
|
||||
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group1.dat",
|
||||
UserGroupInformation.getLoginUser().getUserName());
|
||||
assertTrue(testLogInfo.matchesGroupId(testGroupId));
|
||||
// Check file names shorter than group id
|
||||
testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app2",
|
||||
UserGroupInformation.getLoginUser().getUserName());
|
||||
assertFalse(testLogInfo.matchesGroupId(testGroupId));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseEntity() throws Exception {
|
||||
// Load test data
|
||||
TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
|
||||
EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
|
||||
TEST_ENTITY_FILE_NAME,
|
||||
UserGroupInformation.getLoginUser().getUserName());
|
||||
testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
|
||||
fs);
|
||||
// Verify for the first batch
|
||||
PluginStoreTestUtils.verifyTestEntities(tdm);
|
||||
// Load new data
|
||||
TimelineEntity entityNew = PluginStoreTestUtils
|
||||
.createEntity("id_3", "type_3", 789l, null, null,
|
||||
null, null, "domain_id_1");
|
||||
TimelineEntities entityList = new TimelineEntities();
|
||||
entityList.addEntity(entityNew);
|
||||
writeEntitiesLeaveOpen(entityList,
|
||||
new Path(new Path(TEST_ROOT_DIR, TEST_ATTEMPT_DIR_NAME),
|
||||
TEST_ENTITY_FILE_NAME));
|
||||
testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
|
||||
fs);
|
||||
// Verify the newly added data
|
||||
TimelineEntity entity3 = tdm.getEntity(entityNew.getEntityType(),
|
||||
entityNew.getEntityId(), EnumSet.allOf(TimelineReader.Field.class),
|
||||
UserGroupInformation.getLoginUser());
|
||||
assertNotNull(entity3);
|
||||
assertEquals("Failed to read out entity new",
|
||||
entityNew.getStartTime(), entity3.getStartTime());
|
||||
tdm.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseBrokenEntity() throws Exception {
|
||||
// Load test data
|
||||
TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
|
||||
EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
|
||||
TEST_BROKEN_FILE_NAME,
|
||||
UserGroupInformation.getLoginUser().getUserName());
|
||||
DomainLogInfo domainLogInfo = new DomainLogInfo(TEST_ATTEMPT_DIR_NAME,
|
||||
TEST_BROKEN_FILE_NAME,
|
||||
UserGroupInformation.getLoginUser().getUserName());
|
||||
// Try parse, should not fail
|
||||
testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
|
||||
fs);
|
||||
domainLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
|
||||
fs);
|
||||
tdm.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseDomain() throws Exception {
|
||||
// Load test data
|
||||
TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
|
||||
DomainLogInfo domainLogInfo = new DomainLogInfo(TEST_ATTEMPT_DIR_NAME,
|
||||
TEST_DOMAIN_FILE_NAME,
|
||||
UserGroupInformation.getLoginUser().getUserName());
|
||||
domainLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
|
||||
fs);
|
||||
// Verify domain data
|
||||
TimelineDomain resultDomain = tdm.getDomain("domain_1",
|
||||
UserGroupInformation.getLoginUser());
|
||||
assertNotNull(resultDomain);
|
||||
assertEquals(testDomain.getReaders(), resultDomain.getReaders());
|
||||
assertEquals(testDomain.getOwner(), resultDomain.getOwner());
|
||||
assertEquals(testDomain.getDescription(), resultDomain.getDescription());
|
||||
}
|
||||
|
||||
private void writeBrokenFile(Path logPath) throws IOException {
|
||||
FSDataOutputStream out = null;
|
||||
try {
|
||||
String broken = "{ broken { [[]} broken";
|
||||
out = PluginStoreTestUtils.createLogFile(logPath, fs);
|
||||
out.write(broken.getBytes(Charset.forName("UTF-8")));
|
||||
out.close();
|
||||
out = null;
|
||||
} finally {
|
||||
if (out != null) {
|
||||
out.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestLogInfo needs to maintain opened hdfs files so we have to build our own
|
||||
// write methods
|
||||
private void writeEntitiesLeaveOpen(TimelineEntities entities, Path logPath)
|
||||
throws IOException {
|
||||
if (outStream == null) {
|
||||
outStream = PluginStoreTestUtils.createLogFile(logPath, fs);
|
||||
jsonGenerator = (new JsonFactory()).createJsonGenerator(outStream);
|
||||
jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
|
||||
}
|
||||
for (TimelineEntity entity : entities.getEntities()) {
|
||||
objMapper.writeValue(jsonGenerator, entity);
|
||||
}
|
||||
outStream.hflush();
|
||||
}
|
||||
|
||||
private void writeDomainLeaveOpen(TimelineDomain domain, Path logPath)
|
||||
throws IOException {
|
||||
if (outStreamDomain == null) {
|
||||
outStreamDomain = PluginStoreTestUtils.createLogFile(logPath, fs);
|
||||
}
|
||||
// Write domain uses its own json generator to isolate from entity writers
|
||||
JsonGenerator jsonGeneratorLocal
|
||||
= (new JsonFactory()).createJsonGenerator(outStreamDomain);
|
||||
jsonGeneratorLocal.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
|
||||
objMapper.writeValue(jsonGeneratorLocal, domain);
|
||||
outStreamDomain.hflush();
|
||||
}
|
||||
|
||||
}
|
|
@ -42,5 +42,6 @@
|
|||
<module>hadoop-yarn-server-sharedcachemanager</module>
|
||||
<module>hadoop-yarn-server-tests</module>
|
||||
<module>hadoop-yarn-server-applicationhistoryservice</module>
|
||||
<module>hadoop-yarn-server-timeline-pluginstorage</module>
|
||||
</modules>
|
||||
</project>
|
||||
|
|
Loading…
Reference in New Issue