diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 4d63a6cddd8..9b985a2216c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -211,6 +211,9 @@ Release 2.8.0 - UNRELEASED YARN-4234. New put APIs in TimelineClient for ats v1.5. (Xuan Gong via junping_du) + YARN-4265. Provide new timeline plugin storage to support fine-grained entity + caching. (Li Lu and Jason Lowe via junping_du) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6299a38e9e4..04f61da8a41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1546,6 +1546,7 @@ public class YarnConfiguration extends Configuration { public static final String TIMELINE_SERVICE_VERSION = TIMELINE_SERVICE_PREFIX + "version"; public static final float DEFAULT_TIMELINE_SERVICE_VERSION = 1.0f; + /** * Comma seperated list of names for UIs hosted in the timeline server * (For pluggable UIs). @@ -1581,6 +1582,63 @@ public class YarnConfiguration extends Configuration { TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT = "/tmp/entity-file-history/active"; + public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR = + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "done-dir"; + public static final String + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT = + "/tmp/entity-file-history/done"; + + public static final String TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES = + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "group-id-plugin-classes"; + + public static final String + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_STORE = + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-store"; + + public static final String + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES = + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-entity-types"; + + public static final String + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS = + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "scan-interval-seconds"; + public static final long + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT = 60; + + public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS = + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "threads"; + public static final int + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT = 16; + + public static final String + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE + = TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "app-cache-size"; + public static final int + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE_DEFAULT = 10; + + public static final String + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS = + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "cleaner-interval-seconds"; + public static final int + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS_DEFAULT = + 60 * 60; + + public static final String + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS + = TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retain-seconds"; + public static final int + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS_DEFAULT = + 7 * 24 * 60 * 60; + + // how old the most recent log of an UNKNOWN app needs to be in the active + // directory before we treat it as COMPLETED + public static final String + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS = + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "unknown-active-seconds"; + public static final int + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS_DEFAULT + = 24 * 60 * 60; + public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC = TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy-spec"; @@ -1588,10 +1646,6 @@ public class YarnConfiguration extends Configuration { DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC = "2000, 500"; - public static final String - TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES = - TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-entity-types"; - public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS = TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs"; public static final long diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 13a7b1b2f7d..fe308ec9e79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1978,6 +1978,64 @@ ${hadoop.tmp.dir}/yarn/timeline + + + + yarn.timeline-service.entity-group-fs-store.active-dir + /tmp/entity-file-history/active + HDFS path to store active application’s timeline data + + + + yarn.timeline-service.entity-group-fs-store.done-dir + /tmp/entity-file-history/done/ + HDFS path to store done application’s timeline data + + + + yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes + + + Plugins that can translate a timeline entity read request into + a list of timeline entity group ids, separated by commas. + + + + + yarn.timeline-service.entity-group-fs-store.summary-store + Summary storage for ATS v1.5 + org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore + + + + yarn.timeline-service.entity-group-fs-store.scan-interval-seconds + + Scan interval for ATS v1.5 entity group file system storage reader.This + value controls how frequent the reader will scan the HDFS active directory + for application status. + + 60 + + + + yarn.timeline-service.entity-group-fs-store.cleaner-interval-seconds + + Scan interval for ATS v1.5 entity group file system storage cleaner.This + value controls how frequent the reader will scan the HDFS done directory + for stale application data. + + 3600 + + + + yarn.timeline-service.entity-group-fs-store.retain-seconds + + How long the ATS v1.5 entity group file system storage will keep an + application's data in the done directory. + + 604800 + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml index 1d3ecef064b..64814a57883 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml @@ -220,6 +220,17 @@ + + maven-jar-plugin + + + + test-jar + + test-compile + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java index cda84a22eac..d0af7789417 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java @@ -233,8 +233,9 @@ public class ApplicationHistoryServer extends CompositeService { } private TimelineDataManager createTimelineDataManager(Configuration conf) { - return new TimelineDataManager( - timelineStore, new TimelineACLsManager(conf)); + TimelineACLsManager aclsMgr = new TimelineACLsManager(conf); + aclsMgr.setTimelineStore(timelineStore); + return new TimelineDataManager(timelineStore, aclsMgr); } @SuppressWarnings("unchecked") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java index 23ff8e4ddac..57a9346602c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java @@ -67,7 +67,6 @@ public class TimelineDataManager extends AbstractService { super(TimelineDataManager.class.getName()); this.store = store; this.timelineACLsManager = timelineACLsManager; - timelineACLsManager.setTimelineStore(store); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManagerMetrics.java index afd58184975..3591b39db10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManagerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManagerMetrics.java @@ -92,12 +92,17 @@ public class TimelineDataManagerMetrics { getDomainsOps.value(); } + private static TimelineDataManagerMetrics instance = null; + TimelineDataManagerMetrics() { } - public static TimelineDataManagerMetrics create() { - MetricsSystem ms = DefaultMetricsSystem.instance(); - return ms.register(new TimelineDataManagerMetrics()); + public static synchronized TimelineDataManagerMetrics create() { + if (instance == null) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + instance = ms.register(new TimelineDataManagerMetrics()); + } + return instance; } public void incrGetEntitiesOps() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java index 1e98e8d2502..7ef6eca0dca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java @@ -65,6 +65,7 @@ public class TestApplicationHistoryClientService { TimelineStore store = TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS); TimelineACLsManager aclsManager = new TimelineACLsManager(conf); + aclsManager.setTimelineStore(store); dataManager = new TimelineDataManager(store, aclsManager); dataManager.init(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java index a669f37d3e5..dfc5b81b579 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java @@ -96,6 +96,7 @@ public class TestApplicationHistoryManagerOnTimelineStore { public void setup() throws Exception { // Only test the ACLs of the generic history TimelineACLsManager aclsManager = new TimelineACLsManager(new YarnConfiguration()); + aclsManager.setTimelineStore(store); TimelineDataManager dataManager = new TimelineDataManager(store, aclsManager); dataManager.init(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java index f2179b457b8..20dfe454cae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java @@ -90,6 +90,7 @@ public class TestAHSWebServices extends JerseyTestBase { TimelineStore store = TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS); TimelineACLsManager aclsManager = new TimelineACLsManager(conf); + aclsManager.setTimelineStore(store); TimelineDataManager dataManager = new TimelineDataManager(store, aclsManager); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestTimelineDataManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestTimelineDataManager.java index ace2eb8209e..8fba54c9e80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestTimelineDataManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestTimelineDataManager.java @@ -62,6 +62,7 @@ public class TestTimelineDataManager extends TimelineStoreTestUtils { conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false); aclsManager = new TimelineACLsManager(conf); + aclsManager.setTimelineStore(store); dataManaer = new TimelineDataManager(store, aclsManager); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml new file mode 100644 index 00000000000..385ba5ddd5a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml @@ -0,0 +1,136 @@ + + + + + hadoop-yarn-server + org.apache.hadoop + 3.0.0-SNAPSHOT + + 4.0.0 + org.apache.hadoop + hadoop-yarn-server-timeline-pluginstorage + 3.0.0-SNAPSHOT + Apache Hadoop YARN Timeline Plugin Storage + + + + ${project.parent.parent.basedir} + + + + + + org.apache.hadoop + hadoop-common + provided + + + commons-el + commons-el + + + tomcat + jasper-runtime + + + tomcat + jasper-compiler + + + org.mortbay.jetty + jsp-2.1-jetty + + + + + + junit + junit + test + + + org.apache.hadoop + hadoop-common + test-jar + test + + + org.apache.hadoop + hadoop-yarn-common + + + org.apache.hadoop + hadoop-yarn-api + + + org.apache.hadoop + hadoop-yarn-client + + + org.apache.hadoop + hadoop-yarn-server-applicationhistoryservice + ${project.version} + + + org.apache.hadoop + hadoop-yarn-server-applicationhistoryservice + ${project.version} + test-jar + test + + + org.slf4j + slf4j-api + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-xc + + + com.google.guava + guava + + + org.apache.hadoop + hadoop-hdfs + test + + + org.apache.hadoop + hadoop-hdfs + test + test-jar + + + org.mockito + mockito-all + test + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java new file mode 100644 index 00000000000..37a1d8dad88 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; +import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Cache item for timeline server v1.5 reader cache. Each cache item has a + * TimelineStore that can be filled with data within one entity group. + */ +public class EntityCacheItem { + private static final Logger LOG + = LoggerFactory.getLogger(EntityCacheItem.class); + + private TimelineStore store; + private EntityGroupFSTimelineStore.AppLogs appLogs; + private long lastRefresh; + private Configuration config; + private FileSystem fs; + + public EntityCacheItem(Configuration config, FileSystem fs) { + this.config = config; + this.fs = fs; + } + + /** + * @return The application log associated to this cache item, may be null. + */ + public synchronized EntityGroupFSTimelineStore.AppLogs getAppLogs() { + return this.appLogs; + } + + /** + * Set the application logs to this cache item. The entity group should be + * associated with this application. + * + * @param incomingAppLogs + */ + public synchronized void setAppLogs( + EntityGroupFSTimelineStore.AppLogs incomingAppLogs) { + this.appLogs = incomingAppLogs; + } + + /** + * @return The timeline store, either loaded or unloaded, of this cache item. + */ + public synchronized TimelineStore getStore() { + return store; + } + + /** + * Refresh this cache item if it needs refresh. This will enforce an appLogs + * rescan and then load new data. The refresh process is synchronized with + * other operations on the same cache item. + * + * @param groupId + * @param aclManager + * @param jsonFactory + * @param objMapper + * @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore} + * object filled with all entities in the group. + * @throws IOException + */ + public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId, + TimelineACLsManager aclManager, JsonFactory jsonFactory, + ObjectMapper objMapper) throws IOException { + if (needRefresh()) { + // If an application is not finished, we only update summary logs (and put + // new entities into summary storage). + // Otherwise, since the application is done, we can update detail logs. + if (!appLogs.isDone()) { + appLogs.parseSummaryLogs(); + } else if (appLogs.getDetailLogs().isEmpty()) { + appLogs.scanForLogs(); + } + if (!appLogs.getDetailLogs().isEmpty()) { + if (store == null) { + store = new MemoryTimelineStore(); + store.init(config); + store.start(); + } + TimelineDataManager tdm = new TimelineDataManager(store, + aclManager); + tdm.init(config); + tdm.start(); + List removeList = new ArrayList(); + for (LogInfo log : appLogs.getDetailLogs()) { + LOG.debug("Try refresh logs for {}", log.getFilename()); + // Only refresh the log that matches the cache id + if (log.matchesGroupId(groupId)) { + Path appDirPath = appLogs.getAppDirPath(); + if (fs.exists(log.getPath(appDirPath))) { + LOG.debug("Refresh logs for cache id {}", groupId); + log.parseForStore(tdm, appDirPath, appLogs.isDone(), jsonFactory, + objMapper, fs); + } else { + // The log may have been removed, remove the log + removeList.add(log); + LOG.info("File {} no longer exists, remove it from log list", + log.getPath(appDirPath)); + } + } + } + appLogs.getDetailLogs().removeAll(removeList); + tdm.close(); + } + updateRefreshTimeToNow(); + } else { + LOG.debug("Cache new enough, skip refreshing"); + } + return store; + } + + /** + * Release the cache item for the given group id. + * + * @param groupId + */ + public synchronized void releaseCache(TimelineEntityGroupId groupId) { + try { + if (store != null) { + store.close(); + } + } catch (IOException e) { + LOG.warn("Error closing timeline store", e); + } + store = null; + // reset offsets so next time logs are re-parsed + for (LogInfo log : appLogs.getDetailLogs()) { + if (log.getFilename().contains(groupId.toString())) { + log.setOffset(0); + } + } + } + + private boolean needRefresh() { + return (Time.monotonicNow() - lastRefresh > 10000); + } + + private void updateRefreshTimeToNow() { + this.lastRefresh = Time.monotonicNow(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java new file mode 100644 index 00000000000..b1fbd132d33 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java @@ -0,0 +1,895 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl; +import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.map.MappingJsonFactory; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Plugin timeline storage to support timeline server v1.5 API. This storage + * uses a file system to store timeline entities in their groups. + */ +public class EntityGroupFSTimelineStore extends AbstractService + implements TimelineStore { + + static final String DOMAIN_LOG_PREFIX = "domainlog-"; + static final String SUMMARY_LOG_PREFIX = "summarylog-"; + static final String ENTITY_LOG_PREFIX = "entitylog-"; + + private static final Logger LOG = LoggerFactory.getLogger( + EntityGroupFSTimelineStore.class); + private static final FsPermission ACTIVE_DIR_PERMISSION = + new FsPermission((short) 01777); + private static final FsPermission DONE_DIR_PERMISSION = + new FsPermission((short) 0700); + + private static final EnumSet + APP_FINAL_STATES = EnumSet.of( + YarnApplicationState.FAILED, + YarnApplicationState.KILLED, + YarnApplicationState.FINISHED); + // Active dir: /appId/attemptId/cacheId.log + // Done dir: /cluster_ts/hash1/hash2/appId/attemptId/cacheId.log + private static final String APP_DONE_DIR_PREFIX_FORMAT = + "%d" + Path.SEPARATOR // cluster timestamp + + "%04d" + Path.SEPARATOR // app num / 1,000,000 + + "%03d" + Path.SEPARATOR // (app num / 1000) % 1000 + + "%s" + Path.SEPARATOR; // full app id + + private YarnClient yarnClient; + private TimelineStore summaryStore; + private TimelineACLsManager aclManager; + private TimelineDataManager summaryTdm; + private ConcurrentMap appIdLogMap = + new ConcurrentHashMap(); + private ScheduledThreadPoolExecutor executor; + private FileSystem fs; + private ObjectMapper objMapper; + private JsonFactory jsonFactory; + private Path activeRootPath; + private Path doneRootPath; + private long logRetainMillis; + private long unknownActiveMillis; + private int appCacheMaxSize = 0; + private List cacheIdPlugins; + private Map cachedLogs; + + public EntityGroupFSTimelineStore() { + super(EntityGroupFSTimelineStore.class.getSimpleName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + summaryStore = createSummaryStore(); + summaryStore.init(conf); + long logRetainSecs = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS, + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS_DEFAULT); + logRetainMillis = logRetainSecs * 1000; + LOG.info("Cleaner set to delete logs older than {} seconds", logRetainSecs); + long unknownActiveSecs = conf.getLong( + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS, + YarnConfiguration. + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS_DEFAULT + ); + unknownActiveMillis = unknownActiveSecs * 1000; + LOG.info("Unknown apps will be treated as complete after {} seconds", + unknownActiveSecs); + appCacheMaxSize = conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE, + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE_DEFAULT); + LOG.info("Application cache size is {}", appCacheMaxSize); + cachedLogs = Collections.synchronizedMap( + new LinkedHashMap( + appCacheMaxSize + 1, 0.75f, true) { + @Override + protected boolean removeEldestEntry( + Map.Entry eldest) { + if (super.size() > appCacheMaxSize) { + TimelineEntityGroupId groupId = eldest.getKey(); + LOG.debug("Evicting {} due to space limitations", groupId); + EntityCacheItem cacheItem = eldest.getValue(); + cacheItem.releaseCache(groupId); + if (cacheItem.getAppLogs().isDone()) { + appIdLogMap.remove(groupId.getApplicationId()); + } + return true; + } + return false; + } + }); + cacheIdPlugins = loadPlugIns(conf); + // Initialize yarn client for application status + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + super.serviceInit(conf); + } + + private List loadPlugIns(Configuration conf) + throws RuntimeException { + Collection pluginNames = conf.getStringCollection( + YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES); + List pluginList + = new LinkedList(); + for (final String name : pluginNames) { + LOG.debug("Trying to load plugin class {}", name); + TimelineEntityGroupPlugin cacheIdPlugin = null; + try { + Class clazz = conf.getClassByName(name); + cacheIdPlugin = + (TimelineEntityGroupPlugin) ReflectionUtils.newInstance( + clazz, conf); + } catch (Exception e) { + LOG.warn("Error loading plugin " + name, e); + } + + if (cacheIdPlugin == null) { + throw new RuntimeException("No class defined for " + name); + } + LOG.info("Load plugin class {}", cacheIdPlugin.getClass().getName()); + pluginList.add(cacheIdPlugin); + } + return pluginList; + } + + private TimelineStore createSummaryStore() { + return ReflectionUtils.newInstance(getConfig().getClass( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_STORE, + LeveldbTimelineStore.class, TimelineStore.class), getConfig()); + } + + @Override + protected void serviceStart() throws Exception { + LOG.info("Starting {}", getName()); + yarnClient.start(); + summaryStore.start(); + + Configuration conf = getConfig(); + aclManager = new TimelineACLsManager(conf); + aclManager.setTimelineStore(summaryStore); + summaryTdm = new TimelineDataManager(summaryStore, aclManager); + summaryTdm.init(conf); + summaryTdm.start(); + activeRootPath = new Path(conf.get( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR, + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT)); + doneRootPath = new Path(conf.get( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR, + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT)); + fs = activeRootPath.getFileSystem(conf); + if (!fs.exists(activeRootPath)) { + fs.mkdirs(activeRootPath); + fs.setPermission(activeRootPath, ACTIVE_DIR_PERMISSION); + } + if (!fs.exists(doneRootPath)) { + fs.mkdirs(doneRootPath); + fs.setPermission(doneRootPath, DONE_DIR_PERMISSION); + } + + objMapper = new ObjectMapper(); + objMapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector()); + jsonFactory = new MappingJsonFactory(objMapper); + final long scanIntervalSecs = conf.getLong( + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS, + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT + ); + final long cleanerIntervalSecs = conf.getLong( + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS, + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS_DEFAULT + ); + final int numThreads = conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS, + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT); + LOG.info("Scanning active directory every {} seconds", scanIntervalSecs); + LOG.info("Cleaning logs every {} seconds", cleanerIntervalSecs); + + executor = new ScheduledThreadPoolExecutor(numThreads, + new ThreadFactoryBuilder().setNameFormat("EntityLogPluginWorker #%d") + .build()); + executor.scheduleAtFixedRate(new EntityLogScanner(), 0, scanIntervalSecs, + TimeUnit.SECONDS); + executor.scheduleAtFixedRate(new EntityLogCleaner(), cleanerIntervalSecs, + cleanerIntervalSecs, TimeUnit.SECONDS); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping {}", getName()); + if (executor != null) { + executor.shutdown(); + if (executor.isTerminating()) { + LOG.info("Waiting for executor to terminate"); + boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS); + if (terminated) { + LOG.info("Executor terminated"); + } else { + LOG.warn("Executor did not terminate"); + executor.shutdownNow(); + } + } + } + if (summaryTdm != null) { + summaryTdm.stop(); + } + if (summaryStore != null) { + summaryStore.stop(); + } + if (yarnClient != null) { + yarnClient.stop(); + } + synchronized (cachedLogs) { + for (EntityCacheItem cacheItem : cachedLogs.values()) { + cacheItem.getStore().close(); + } + } + super.serviceStop(); + } + + @InterfaceAudience.Private + @VisibleForTesting + void scanActiveLogs() throws IOException { + RemoteIterator iter = fs.listStatusIterator(activeRootPath); + while (iter.hasNext()) { + FileStatus stat = iter.next(); + ApplicationId appId = parseApplicationId(stat.getPath().getName()); + if (appId != null) { + LOG.debug("scan logs for {} in {}", appId, stat.getPath()); + AppLogs logs = getAndSetActiveLog(appId, stat.getPath()); + executor.execute(new ActiveLogParser(logs)); + } + } + } + + private AppLogs createAndPutAppLogsIfAbsent(ApplicationId appId, + Path appDirPath, AppState appState) { + AppLogs appLogs = new AppLogs(appId, appDirPath, appState); + AppLogs oldAppLogs = appIdLogMap.putIfAbsent(appId, appLogs); + if (oldAppLogs != null) { + appLogs = oldAppLogs; + } + return appLogs; + } + + private AppLogs getAndSetActiveLog(ApplicationId appId, Path appDirPath) { + AppLogs appLogs = appIdLogMap.get(appId); + if (appLogs == null) { + appLogs = createAndPutAppLogsIfAbsent(appId, appDirPath, AppState.ACTIVE); + } + return appLogs; + } + + // searches for the app logs and returns it if found else null + private AppLogs getAndSetAppLogs(ApplicationId applicationId) + throws IOException { + LOG.debug("Looking for app logs mapped for app id {}", applicationId); + AppLogs appLogs = appIdLogMap.get(applicationId); + if (appLogs == null) { + AppState appState = AppState.UNKNOWN; + Path appDirPath = getDoneAppPath(applicationId); + if (fs.exists(appDirPath)) { + appState = AppState.COMPLETED; + } else { + appDirPath = getActiveAppPath(applicationId); + if (fs.exists(appDirPath)) { + appState = AppState.ACTIVE; + } + } + if (appState != AppState.UNKNOWN) { + LOG.debug("Create and try to add new appLogs to appIdLogMap for {}", + applicationId); + appLogs = createAndPutAppLogsIfAbsent( + applicationId, appDirPath, appState); + } + } + return appLogs; + } + + /** + * Main function for entity log cleaner. This method performs depth first + * search from a given dir path for all application log dirs. Once found, it + * will decide if the directory should be cleaned up and then clean them. + * + * @param dirpath the root directory the cleaner should start with. Note that + * dirpath should be a directory that contains a set of + * application log directories. The cleaner method will not + * work if the given dirpath itself is an application log dir. + * @param fs + * @param retainMillis + * @throws IOException + */ + @InterfaceAudience.Private + @VisibleForTesting + static void cleanLogs(Path dirpath, FileSystem fs, long retainMillis) + throws IOException { + long now = Time.now(); + // Depth first search from root directory for all application log dirs + RemoteIterator iter = fs.listStatusIterator(dirpath); + while (iter.hasNext()) { + FileStatus stat = iter.next(); + if (stat.isDirectory()) { + // If current is an application log dir, decide if we need to remove it + // and remove if necessary. + // Otherwise, keep iterating into it. + ApplicationId appId = parseApplicationId(dirpath.getName()); + if (appId != null) { // Application log dir + if (shouldCleanAppLogDir(dirpath, now, fs, retainMillis)) { + try { + LOG.info("Deleting {}", dirpath); + if (!fs.delete(dirpath, true)) { + LOG.error("Unable to remove " + dirpath); + } + } catch (IOException e) { + LOG.error("Unable to remove " + dirpath, e); + } + } + } else { // Keep cleaning inside + cleanLogs(stat.getPath(), fs, retainMillis); + } + } + } + } + + private static boolean shouldCleanAppLogDir(Path appLogPath, long now, + FileSystem fs, long logRetainMillis) throws IOException { + RemoteIterator iter = fs.listStatusIterator(appLogPath); + while (iter.hasNext()) { + FileStatus stat = iter.next(); + if (now - stat.getModificationTime() <= logRetainMillis) { + // found a dir entry that is fresh enough to prevent + // cleaning this directory. + LOG.debug("{} not being cleaned due to {}", appLogPath, stat.getPath()); + return false; + } + // Otherwise, keep searching files inside for directories. + if (stat.isDirectory()) { + if (!shouldCleanAppLogDir(stat.getPath(), now, fs, logRetainMillis)) { + return false; + } + } + } + return true; + } + + // converts the String to an ApplicationId or null if conversion failed + private static ApplicationId parseApplicationId(String appIdStr) { + ApplicationId appId = null; + if (appIdStr.startsWith(ApplicationId.appIdStrPrefix)) { + try { + appId = ConverterUtils.toApplicationId(appIdStr); + } catch (IllegalArgumentException e) { + appId = null; + } + } + return appId; + } + + private Path getActiveAppPath(ApplicationId appId) { + return new Path(activeRootPath, appId.toString()); + } + + private Path getDoneAppPath(ApplicationId appId) { + // cut up the app ID into mod(1000) buckets + int appNum = appId.getId(); + appNum /= 1000; + int bucket2 = appNum % 1000; + int bucket1 = appNum / 1000; + return new Path(doneRootPath, + String.format(APP_DONE_DIR_PREFIX_FORMAT, appId.getClusterTimestamp(), + bucket1, bucket2, appId.toString())); + } + + // This method has to be synchronized to control traffic to RM + private static synchronized AppState getAppState(ApplicationId appId, + YarnClient yarnClient) throws IOException { + AppState appState = AppState.ACTIVE; + try { + ApplicationReport report = yarnClient.getApplicationReport(appId); + YarnApplicationState yarnState = report.getYarnApplicationState(); + if (APP_FINAL_STATES.contains(yarnState)) { + appState = AppState.COMPLETED; + } + } catch (ApplicationNotFoundException e) { + appState = AppState.UNKNOWN; + } catch (YarnException e) { + throw new IOException(e); + } + return appState; + } + + @InterfaceAudience.Private + @VisibleForTesting + enum AppState { + ACTIVE, + UNKNOWN, + COMPLETED + } + + class AppLogs { + private ApplicationId appId; + private Path appDirPath; + private AppState appState; + private List summaryLogs = new ArrayList(); + private List detailLogs = new ArrayList(); + + public AppLogs(ApplicationId appId, Path appPath, AppState state) { + this.appId = appId; + appDirPath = appPath; + appState = state; + } + + public synchronized boolean isDone() { + return appState == AppState.COMPLETED; + } + + public synchronized ApplicationId getAppId() { + return appId; + } + + public synchronized Path getAppDirPath() { + return appDirPath; + } + + synchronized List getSummaryLogs() { + return summaryLogs; + } + + synchronized List getDetailLogs() { + return detailLogs; + } + + public synchronized void parseSummaryLogs() throws IOException { + parseSummaryLogs(summaryTdm); + } + + @InterfaceAudience.Private + @VisibleForTesting + synchronized void parseSummaryLogs(TimelineDataManager tdm) + throws IOException { + if (!isDone()) { + LOG.debug("Try to parse summary log for log {} in {}", + appId, appDirPath); + appState = EntityGroupFSTimelineStore.getAppState(appId, yarnClient); + long recentLogModTime = scanForLogs(); + if (appState == AppState.UNKNOWN) { + if (Time.now() - recentLogModTime > unknownActiveMillis) { + LOG.info( + "{} state is UNKNOWN and logs are stale, assuming COMPLETED", + appId); + appState = AppState.COMPLETED; + } + } + } + List removeList = new ArrayList(); + for (LogInfo log : summaryLogs) { + if (fs.exists(log.getPath(appDirPath))) { + log.parseForStore(tdm, appDirPath, isDone(), jsonFactory, + objMapper, fs); + } else { + // The log may have been removed, remove the log + removeList.add(log); + LOG.info("File {} no longer exists, remove it from log list", + log.getPath(appDirPath)); + } + } + summaryLogs.removeAll(removeList); + } + + // scans for new logs and returns the modification timestamp of the + // most recently modified log + @InterfaceAudience.Private + @VisibleForTesting + long scanForLogs() throws IOException { + LOG.debug("scanForLogs on {}", appDirPath); + long newestModTime = 0; + RemoteIterator iterAttempt = + fs.listStatusIterator(appDirPath); + while (iterAttempt.hasNext()) { + FileStatus statAttempt = iterAttempt.next(); + LOG.debug("scanForLogs on {}", statAttempt.getPath().getName()); + if (!statAttempt.isDirectory() + || !statAttempt.getPath().getName() + .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { + LOG.debug("Scanner skips for unknown dir/file {}", + statAttempt.getPath()); + continue; + } + String attemptDirName = statAttempt.getPath().getName(); + RemoteIterator iterCache + = fs.listStatusIterator(statAttempt.getPath()); + while (iterCache.hasNext()) { + FileStatus statCache = iterCache.next(); + if (!statCache.isFile()) { + continue; + } + String filename = statCache.getPath().getName(); + // We should only update time for log files. + boolean shouldSetTime = true; + LOG.debug("scan for log file: {}", filename); + if (filename.startsWith(DOMAIN_LOG_PREFIX)) { + addSummaryLog(attemptDirName, filename, statCache.getOwner(), true); + } else if (filename.startsWith(SUMMARY_LOG_PREFIX)) { + addSummaryLog(attemptDirName, filename, statCache.getOwner(), + false); + } else if (filename.startsWith(ENTITY_LOG_PREFIX)) { + addDetailLog(attemptDirName, filename, statCache.getOwner()); + } else { + shouldSetTime = false; + } + if (shouldSetTime) { + newestModTime + = Math.max(statCache.getModificationTime(), newestModTime); + } + } + } + + // if there are no logs in the directory then use the modification + // time of the directory itself + if (newestModTime == 0) { + newestModTime = fs.getFileStatus(appDirPath).getModificationTime(); + } + + return newestModTime; + } + + private void addSummaryLog(String attemptDirName, + String filename, String owner, boolean isDomainLog) { + for (LogInfo log : summaryLogs) { + if (log.getFilename().equals(filename) + && log.getAttemptDirName().equals(attemptDirName)) { + return; + } + } + LOG.debug("Incoming log {} not present in my summaryLogs list, add it", + filename); + LogInfo log; + if (isDomainLog) { + log = new DomainLogInfo(attemptDirName, filename, owner); + } else { + log = new EntityLogInfo(attemptDirName, filename, owner); + } + summaryLogs.add(log); + } + + private void addDetailLog(String attemptDirName, String filename, + String owner) { + for (LogInfo log : detailLogs) { + if (log.getFilename().equals(filename) + && log.getAttemptDirName().equals(attemptDirName)) { + return; + } + } + detailLogs.add(new EntityLogInfo(attemptDirName, filename, owner)); + } + + public synchronized void moveToDone() throws IOException { + Path doneAppPath = getDoneAppPath(appId); + if (!doneAppPath.equals(appDirPath)) { + Path donePathParent = doneAppPath.getParent(); + if (!fs.exists(donePathParent)) { + fs.mkdirs(donePathParent); + } + LOG.debug("Application {} is done, trying to move to done dir {}", + appId, doneAppPath); + if (!fs.rename(appDirPath, doneAppPath)) { + throw new IOException("Rename " + appDirPath + " to " + doneAppPath + + " failed"); + } else { + LOG.info("Moved {} to {}", appDirPath, doneAppPath); + } + appDirPath = doneAppPath; + } + } + } + + private class EntityLogScanner implements Runnable { + @Override + public void run() { + LOG.debug("Active scan starting"); + try { + scanActiveLogs(); + } catch (Exception e) { + LOG.error("Error scanning active files", e); + } + LOG.debug("Active scan complete"); + } + } + + private class ActiveLogParser implements Runnable { + private AppLogs appLogs; + + public ActiveLogParser(AppLogs logs) { + appLogs = logs; + } + + @Override + public void run() { + try { + LOG.debug("Begin parsing summary logs. "); + appLogs.parseSummaryLogs(); + if (appLogs.isDone()) { + appLogs.moveToDone(); + appIdLogMap.remove(appLogs.getAppId()); + } + LOG.debug("End parsing summary logs. "); + } catch (Exception e) { + LOG.error("Error processing logs for " + appLogs.getAppId(), e); + } + } + } + + private class EntityLogCleaner implements Runnable { + @Override + public void run() { + LOG.debug("Cleaner starting"); + try { + cleanLogs(doneRootPath, fs, logRetainMillis); + } catch (Exception e) { + LOG.error("Error cleaning files", e); + } + LOG.debug("Cleaner finished"); + } + } + + @InterfaceAudience.Private + @VisibleForTesting + void setFs(FileSystem incomingFs) { + this.fs = incomingFs; + } + + @InterfaceAudience.Private + @VisibleForTesting + void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) { + cachedLogs.put(groupId, cacheItem); + } + + private List getTimelineStoresFromCacheIds( + Set groupIds, String entityType) + throws IOException { + List stores = new LinkedList(); + // For now we just handle one store in a context. We return the first + // non-null storage for the group ids. + for (TimelineEntityGroupId groupId : groupIds) { + TimelineStore storeForId = getCachedStore(groupId); + if (storeForId != null) { + LOG.debug("Adding {} as a store for the query", storeForId.getName()); + stores.add(storeForId); + } + } + if (stores.size() == 0) { + LOG.debug("Using summary store for {}", entityType); + stores.add(this.summaryStore); + } + return stores; + } + + private List getTimelineStoresForRead(String entityId, + String entityType) throws IOException { + Set groupIds = new HashSet(); + for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) { + LOG.debug("Trying plugin {} for id {} and type {}", + cacheIdPlugin.getClass().getName(), entityId, entityType); + Set idsFromPlugin + = cacheIdPlugin.getTimelineEntityGroupId(entityId, entityType); + if (idsFromPlugin == null) { + LOG.debug("Plugin returned null " + cacheIdPlugin.getClass().getName()); + } else { + LOG.debug("Plugin returned ids: " + idsFromPlugin); + } + + if (idsFromPlugin != null) { + groupIds.addAll(idsFromPlugin); + LOG.debug("plugin {} returns a non-null value on query", + cacheIdPlugin.getClass().getName()); + } + } + return getTimelineStoresFromCacheIds(groupIds, entityType); + } + + private List getTimelineStoresForRead(String entityType, + NameValuePair primaryFilter, Collection secondaryFilters) + throws IOException { + Set groupIds = new HashSet(); + for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) { + Set idsFromPlugin = + cacheIdPlugin.getTimelineEntityGroupId(entityType, primaryFilter, + secondaryFilters); + if (idsFromPlugin != null) { + LOG.debug("plugin {} returns a non-null value on query {}", + cacheIdPlugin.getClass().getName(), idsFromPlugin); + groupIds.addAll(idsFromPlugin); + } + } + return getTimelineStoresFromCacheIds(groupIds, entityType); + } + + // find a cached timeline store or null if it cannot be located + private TimelineStore getCachedStore(TimelineEntityGroupId groupId) + throws IOException { + EntityCacheItem cacheItem; + synchronized (this.cachedLogs) { + // Note that the content in the cache log storage may be stale. + cacheItem = this.cachedLogs.get(groupId); + if (cacheItem == null) { + LOG.debug("Set up new cache item for id {}", groupId); + cacheItem = new EntityCacheItem(getConfig(), fs); + AppLogs appLogs = getAndSetAppLogs(groupId.getApplicationId()); + if (appLogs != null) { + LOG.debug("Set applogs {} for group id {}", appLogs, groupId); + cacheItem.setAppLogs(appLogs); + this.cachedLogs.put(groupId, cacheItem); + } else { + LOG.warn("AppLogs for groupId {} is set to null!", groupId); + } + } + } + TimelineStore store = null; + if (cacheItem.getAppLogs() != null) { + AppLogs appLogs = cacheItem.getAppLogs(); + LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId()); + store = cacheItem.refreshCache(groupId, aclManager, jsonFactory, + objMapper); + } else { + LOG.warn("AppLogs for group id {} is null", groupId); + } + return store; + } + + @Override + public TimelineEntities getEntities(String entityType, Long limit, + Long windowStart, Long windowEnd, String fromId, Long fromTs, + NameValuePair primaryFilter, Collection secondaryFilters, + EnumSet fieldsToRetrieve, CheckAcl checkAcl) throws IOException { + LOG.debug("getEntities type={} primary={}", entityType, primaryFilter); + List stores = getTimelineStoresForRead(entityType, + primaryFilter, secondaryFilters); + TimelineEntities returnEntities = new TimelineEntities(); + for (TimelineStore store : stores) { + LOG.debug("Try timeline store {} for the request", store.getName()); + returnEntities.addEntities( + store.getEntities(entityType, limit, windowStart, windowEnd, fromId, + fromTs, primaryFilter, secondaryFilters, fieldsToRetrieve, + checkAcl).getEntities()); + } + return returnEntities; + } + + @Override + public TimelineEntity getEntity(String entityId, String entityType, + EnumSet fieldsToRetrieve) throws IOException { + LOG.debug("getEntity type={} id={}", entityType, entityId); + List stores = getTimelineStoresForRead(entityId, entityType); + for (TimelineStore store : stores) { + LOG.debug("Try timeline store {}:{} for the request", store.getName(), + store.toString()); + TimelineEntity e = + store.getEntity(entityId, entityType, fieldsToRetrieve); + if (e != null) { + return e; + } + } + LOG.debug("getEntity: Found nothing"); + return null; + } + + @Override + public TimelineEvents getEntityTimelines(String entityType, + SortedSet entityIds, Long limit, Long windowStart, + Long windowEnd, Set eventTypes) throws IOException { + LOG.debug("getEntityTimelines type={} ids={}", entityType, entityIds); + TimelineEvents returnEvents = new TimelineEvents(); + for (String entityId : entityIds) { + LOG.debug("getEntityTimeline type={} id={}", entityType, entityId); + List stores + = getTimelineStoresForRead(entityId, entityType); + for (TimelineStore store : stores) { + LOG.debug("Try timeline store {}:{} for the request", store.getName(), + store.toString()); + SortedSet entityIdSet = new TreeSet<>(); + entityIdSet.add(entityId); + TimelineEvents events = + store.getEntityTimelines(entityType, entityIdSet, limit, + windowStart, windowEnd, eventTypes); + returnEvents.addEvents(events.getAllEvents()); + } + } + return returnEvents; + } + + @Override + public TimelineDomain getDomain(String domainId) throws IOException { + return summaryStore.getDomain(domainId); + } + + @Override + public TimelineDomains getDomains(String owner) throws IOException { + return summaryStore.getDomains(owner); + } + + @Override + public TimelinePutResponse put(TimelineEntities data) throws IOException { + return summaryStore.put(data); + } + + @Override + public void put(TimelineDomain domain) throws IOException { + summaryStore.put(domain); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java new file mode 100644 index 00000000000..4caed8dffd2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.MappingIterator; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; + +abstract class LogInfo { + public static final String ENTITY_FILE_NAME_DELIMITERS = "_."; + + public String getAttemptDirName() { + return attemptDirName; + } + + public long getOffset() { + return offset; + } + + public void setOffset(long newOffset) { + this.offset = newOffset; + } + + private String attemptDirName; + private String filename; + private String user; + private long offset = 0; + + private static final Logger LOG = LoggerFactory.getLogger(LogInfo.class); + + public LogInfo(String attemptDirName, String file, String owner) { + this.attemptDirName = attemptDirName; + filename = file; + user = owner; + } + + public Path getPath(Path rootPath) { + Path attemptPath = new Path(rootPath, attemptDirName); + return new Path(attemptPath, filename); + } + + public String getFilename() { + return filename; + } + + public boolean matchesGroupId(TimelineEntityGroupId groupId) { + return matchesGroupId(groupId.toString()); + } + + @InterfaceAudience.Private + @VisibleForTesting + boolean matchesGroupId(String groupId){ + // Return true if the group id is a segment (separated by _, ., or end of + // string) of the file name. + int pos = filename.indexOf(groupId); + if (pos < 0) { + return false; + } + return filename.length() == pos + groupId.length() + || ENTITY_FILE_NAME_DELIMITERS.contains(String.valueOf( + filename.charAt(pos + groupId.length()) + )); + } + + public void parseForStore(TimelineDataManager tdm, Path appDirPath, + boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper, + FileSystem fs) throws IOException { + LOG.debug("Parsing for log dir {} on attempt {}", appDirPath, + attemptDirName); + Path logPath = getPath(appDirPath); + if (fs.exists(logPath)) { + long startTime = Time.monotonicNow(); + try { + LOG.debug("Parsing {} at offset {}", logPath, offset); + long count = parsePath(tdm, logPath, appCompleted, jsonFactory, + objMapper, fs); + LOG.info("Parsed {} entities from {} in {} msec", + count, logPath, Time.monotonicNow() - startTime); + } catch (RuntimeException e) { + if (e.getCause() instanceof JsonParseException) { + // If AppLogs cannot parse this log, it may be corrupted + LOG.info("Log {} appears to be corrupted. Skip. ", logPath); + } + } + } else { + LOG.warn("{} no longer exists. Skip for scanning. ", logPath); + } + } + + private long parsePath(TimelineDataManager tdm, Path logPath, + boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper, + FileSystem fs) throws IOException { + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(user); + FSDataInputStream in = fs.open(logPath); + JsonParser parser = null; + try { + in.seek(offset); + try { + parser = jsonFactory.createJsonParser(in); + parser.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); + } catch (IOException e) { + // if app hasn't completed then there may be errors due to the + // incomplete file which are treated as EOF until app completes + if (appCompleted) { + throw e; + } else { + LOG.debug("Exception in parse path: {}", e.getMessage()); + return 0; + } + } + + return doParse(tdm, parser, objMapper, ugi, appCompleted); + } finally { + IOUtils.closeStream(parser); + IOUtils.closeStream(in); + } + } + + protected abstract long doParse(TimelineDataManager tdm, JsonParser parser, + ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted) + throws IOException; +} + +class EntityLogInfo extends LogInfo { + private static final Logger LOG = LoggerFactory.getLogger( + EntityGroupFSTimelineStore.class); + + public EntityLogInfo(String attemptId, + String file, String owner) { + super(attemptId, file, owner); + } + + @Override + protected long doParse(TimelineDataManager tdm, JsonParser parser, + ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted) + throws IOException { + long count = 0; + TimelineEntities entities = new TimelineEntities(); + ArrayList entityList = new ArrayList(1); + long bytesParsed; + long bytesParsedLastBatch = 0; + boolean postError = false; + try { + MappingIterator iter = objMapper.readValues(parser, + TimelineEntity.class); + + while (iter.hasNext()) { + TimelineEntity entity = iter.next(); + String etype = entity.getEntityType(); + String eid = entity.getEntityId(); + LOG.trace("Read entity {}", etype); + ++count; + bytesParsed = parser.getCurrentLocation().getCharOffset() + 1; + LOG.trace("Parser now at offset {}", bytesParsed); + + try { + LOG.debug("Adding {}({}) to store", eid, etype); + entityList.add(entity); + entities.setEntities(entityList); + TimelinePutResponse response = tdm.postEntities(entities, ugi); + for (TimelinePutResponse.TimelinePutError e + : response.getErrors()) { + LOG.warn("Error putting entity: {} ({}): {}", + e.getEntityId(), e.getEntityType(), e.getErrorCode()); + } + setOffset(getOffset() + bytesParsed - bytesParsedLastBatch); + bytesParsedLastBatch = bytesParsed; + entityList.clear(); + } catch (YarnException e) { + postError = true; + throw new IOException("Error posting entities", e); + } catch (IOException e) { + postError = true; + throw new IOException("Error posting entities", e); + } + } + } catch (IOException e) { + // if app hasn't completed then there may be errors due to the + // incomplete file which are treated as EOF until app completes + if (appCompleted || postError) { + throw e; + } + } catch (RuntimeException e) { + if (appCompleted || !(e.getCause() instanceof JsonParseException)) { + throw e; + } + } + return count; + } +} + +class DomainLogInfo extends LogInfo { + private static final Logger LOG = LoggerFactory.getLogger( + EntityGroupFSTimelineStore.class); + + public DomainLogInfo(String attemptDirName, String file, + String owner) { + super(attemptDirName, file, owner); + } + + protected long doParse(TimelineDataManager tdm, JsonParser parser, + ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted) + throws IOException { + long count = 0; + long bytesParsed; + long bytesParsedLastBatch = 0; + boolean putError = false; + try { + MappingIterator iter = objMapper.readValues(parser, + TimelineDomain.class); + + while (iter.hasNext()) { + TimelineDomain domain = iter.next(); + domain.setOwner(ugi.getShortUserName()); + LOG.trace("Read domain {}", domain.getId()); + ++count; + bytesParsed = parser.getCurrentLocation().getCharOffset() + 1; + LOG.trace("Parser now at offset {}", bytesParsed); + + try { + tdm.putDomain(domain, ugi); + setOffset(getOffset() + bytesParsed - bytesParsedLastBatch); + bytesParsedLastBatch = bytesParsed; + } catch (YarnException e) { + putError = true; + throw new IOException("Error posting domain", e); + } catch (IOException e) { + putError = true; + throw new IOException("Error posting domain", e); + } + } + } catch (IOException e) { + // if app hasn't completed then there may be errors due to the + // incomplete file which are treated as EOF until app completes + if (appCompleted || putError) { + throw e; + } + } catch (RuntimeException e) { + if (appCompleted || !(e.getCause() instanceof JsonParseException)) { + throw e; + } + } + return count; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityGroupPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityGroupPlugin.java new file mode 100644 index 00000000000..9cdbf5fc2e9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityGroupPlugin.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; + +import java.util.Collection; +import java.util.Set; +import java.util.SortedSet; + +/** + * Plugin to map a requested query ( or an Entity/set of Entities ) to a CacheID. + * The Cache ID is an identifier to the data set that needs to be queried to + * serve the response for the query. + */ +public abstract class TimelineEntityGroupPlugin { + + /** + * Get the {@link TimelineEntityGroupId}s for the data sets that need to be + * scanned to serve the query. + * + * @param entityType Entity Type being queried + * @param primaryFilter Primary filter being applied + * @param secondaryFilters Secondary filters being applied in the query + * @return {@link org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId} + */ + public abstract Set getTimelineEntityGroupId( + String entityType, NameValuePair primaryFilter, + Collection secondaryFilters); + + /** + * Get the {@link TimelineEntityGroupId}s for the data sets that need to be + * scanned to serve the query. + * + * @param entityType Entity Type being queried + * @param entityId Entity Id being requested + * @return {@link org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId} + */ + public abstract Set getTimelineEntityGroupId( + String entityId, + String entityType); + + + /** + * Get the {@link TimelineEntityGroupId}s for the data sets that need to be + * scanned to serve the query. + * + * @param entityType Entity Type being queried + * @param entityIds Entity Ids being requested + * @param eventTypes Event Types being requested + * @return {@link org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId} + */ + public abstract Set getTimelineEntityGroupId( + String entityType, SortedSet entityIds, + Set eventTypes); + + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java new file mode 100644 index 00000000000..6f61bba43d3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java new file mode 100644 index 00000000000..71e26cbf548 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import com.google.common.collect.Sets; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; + +import java.util.Collection; +import java.util.Set; +import java.util.SortedSet; + +class EntityGroupPlugInForTest extends TimelineEntityGroupPlugin { + + private static TimelineEntityGroupId timelineEntityGroupId + = TimelineEntityGroupId.newInstance( + TestEntityGroupFSTimelineStore.TEST_APPLICATION_ID, "test"); + + @Override + public Set getTimelineEntityGroupId(String entityType, + NameValuePair primaryFilter, + Collection secondaryFilters) { + return Sets.newHashSet(timelineEntityGroupId); + } + + @Override + public Set getTimelineEntityGroupId(String entityId, + String entityType) { + return Sets.newHashSet(timelineEntityGroupId); + } + + @Override + public Set getTimelineEntityGroupId(String entityType, + SortedSet entityIds, + Set eventTypes) { + return Sets.newHashSet(timelineEntityGroupId); + } + + static TimelineEntityGroupId getStandardTimelineGroupId() { + return timelineEntityGroupId; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java new file mode 100644 index 00000000000..e0379b1a811 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.codehaus.jackson.map.annotate.JsonSerialize; +import org.codehaus.jackson.util.MinimalPrettyPrinter; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +class PluginStoreTestUtils { + + static FSDataOutputStream createLogFile(Path logPath, FileSystem fs) + throws IOException { + FSDataOutputStream stream; + stream = fs.create(logPath, true); + return stream; + } + + static ObjectMapper createObjectMapper() { + ObjectMapper mapper = new ObjectMapper(); + mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector()); + mapper.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); + mapper.configure(SerializationConfig.Feature.CLOSE_CLOSEABLE, false); + return mapper; + } + + /** + * Create sample entities for testing + * @return two timeline entities in a {@link TimelineEntities} object + */ + static TimelineEntities generateTestEntities() { + TimelineEntities entities = new TimelineEntities(); + Map> primaryFilters = + new HashMap>(); + Set l1 = new HashSet(); + l1.add("username"); + Set l2 = new HashSet(); + l2.add(Integer.MAX_VALUE); + Set l3 = new HashSet(); + l3.add("123abc"); + Set l4 = new HashSet(); + l4.add((long)Integer.MAX_VALUE + 1l); + primaryFilters.put("user", l1); + primaryFilters.put("appname", l2); + primaryFilters.put("other", l3); + primaryFilters.put("long", l4); + Map secondaryFilters = new HashMap(); + secondaryFilters.put("startTime", 123456); + secondaryFilters.put("status", "RUNNING"); + Map otherInfo1 = new HashMap(); + otherInfo1.put("info1", "val1"); + otherInfo1.putAll(secondaryFilters); + + String entityId1 = "id_1"; + String entityType1 = "type_1"; + String entityId2 = "id_2"; + String entityType2 = "type_2"; + + Map> relatedEntities = + new HashMap>(); + relatedEntities.put(entityType2, Collections.singleton(entityId2)); + + TimelineEvent ev3 = createEvent(789l, "launch_event", null); + TimelineEvent ev4 = createEvent(0l, "init_event", null); + List events = new ArrayList(); + events.add(ev3); + events.add(ev4); + entities.addEntity(createEntity(entityId2, entityType2, 456l, events, null, + null, null, "domain_id_1")); + + TimelineEvent ev1 = createEvent(123l, "start_event", null); + entities.addEntity(createEntity(entityId1, entityType1, 123l, + Collections.singletonList(ev1), relatedEntities, primaryFilters, + otherInfo1, "domain_id_1")); + return entities; + } + + static void verifyTestEntities(TimelineDataManager tdm) + throws YarnException, IOException { + TimelineEntity entity1 = tdm.getEntity("type_1", "id_1", + EnumSet.allOf(TimelineReader.Field.class), + UserGroupInformation.getLoginUser()); + TimelineEntity entity2 = tdm.getEntity("type_2", "id_2", + EnumSet.allOf(TimelineReader.Field.class), + UserGroupInformation.getLoginUser()); + assertNotNull(entity1); + assertNotNull(entity2); + assertEquals("Failed to read out entity 1", + (Long) 123l, entity1.getStartTime()); + assertEquals("Failed to read out entity 2", + (Long) 456l, entity2.getStartTime()); + } + + /** + * Create a test entity + */ + static TimelineEntity createEntity(String entityId, String entityType, + Long startTime, List events, + Map> relatedEntities, + Map> primaryFilters, + Map otherInfo, String domainId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId(entityId); + entity.setEntityType(entityType); + entity.setStartTime(startTime); + entity.setEvents(events); + if (relatedEntities != null) { + for (Map.Entry> e : relatedEntities.entrySet()) { + for (String v : e.getValue()) { + entity.addRelatedEntity(e.getKey(), v); + } + } + } else { + entity.setRelatedEntities(null); + } + entity.setPrimaryFilters(primaryFilters); + entity.setOtherInfo(otherInfo); + entity.setDomainId(domainId); + return entity; + } + + /** + * Create a test event + */ + static TimelineEvent createEvent(long timestamp, String type, Map info) { + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(timestamp); + event.setEventType(type); + event.setEventInfo(info); + return event; + } + + /** + * Write timeline entities to a file system + * @param entities + * @param logPath + * @param fs + * @throws IOException + */ + static void writeEntities(TimelineEntities entities, Path logPath, + FileSystem fs) throws IOException { + FSDataOutputStream outStream = createLogFile(logPath, fs); + JsonGenerator jsonGenerator + = (new JsonFactory()).createJsonGenerator(outStream); + jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); + ObjectMapper objMapper = createObjectMapper(); + for (TimelineEntity entity : entities.getEntities()) { + objMapper.writeValue(jsonGenerator, entity); + } + outStream.close(); + } + + static TimelineDataManager getTdmWithStore(Configuration config, TimelineStore store) { + TimelineACLsManager aclManager = new TimelineACLsManager(config); + TimelineDataManager tdm = new TimelineDataManager(store, aclManager); + tdm.init(config); + return tdm; + } + + static TimelineDataManager getTdmWithMemStore(Configuration config) { + TimelineStore store = new MemoryTimelineStore(); + TimelineDataManager tdm = getTdmWithStore(config, store); + return tdm; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java new file mode 100644 index 00000000000..e43b705b031 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; + +import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { + + private static final String SAMPLE_APP_NAME = "1234_5678"; + + static final ApplicationId TEST_APPLICATION_ID + = ConverterUtils.toApplicationId( + ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_NAME); + + private static final String TEST_APP_DIR_NAME + = TEST_APPLICATION_ID.toString(); + private static final String TEST_ATTEMPT_DIR_NAME + = ApplicationAttemptId.appAttemptIdStrPrefix + SAMPLE_APP_NAME + "_1"; + private static final String TEST_SUMMARY_LOG_FILE_NAME + = EntityGroupFSTimelineStore.SUMMARY_LOG_PREFIX + "test"; + private static final String TEST_ENTITY_LOG_FILE_NAME + = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX + + EntityGroupPlugInForTest.getStandardTimelineGroupId(); + private static final String TEST_DOMAIN_LOG_FILE_NAME + = EntityGroupFSTimelineStore.DOMAIN_LOG_PREFIX + "test"; + + private static final Path TEST_ROOT_DIR + = new Path(System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), + TestEntityGroupFSTimelineStore.class.getSimpleName()); + private static final Path TEST_APP_DIR_PATH + = new Path(TEST_ROOT_DIR, TEST_APP_DIR_NAME); + private static final Path TEST_ATTEMPT_DIR_PATH + = new Path(TEST_APP_DIR_PATH, TEST_ATTEMPT_DIR_NAME); + private static final Path TEST_DONE_DIR_PATH + = new Path(TEST_ROOT_DIR, "done"); + + private static Configuration config = new YarnConfiguration(); + private static MiniDFSCluster hdfsCluster; + private static FileSystem fs; + private EntityGroupFSTimelineStore store; + private TimelineEntity entityNew; + + @Rule + public TestName currTestName = new TestName(); + + @BeforeClass + public static void setupClass() throws Exception { + config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false); + config.set( + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES, + "YARN_APPLICATION,YARN_APPLICATION_ATTEMPT,YARN_CONTAINER"); + config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR, + TEST_DONE_DIR_PATH.toString()); + config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString()); + HdfsConfiguration hdfsConfig = new HdfsConfiguration(); + hdfsCluster + = new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build(); + fs = hdfsCluster.getFileSystem(); + } + + @Before + public void setup() throws Exception { + createTestFiles(); + store = new EntityGroupFSTimelineStore(); + if (currTestName.getMethodName().contains("Plugin")) { + config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES, + EntityGroupPlugInForTest.class.getName()); + } + store.init(config); + store.start(); + store.setFs(fs); + } + + @After + public void tearDown() throws Exception { + fs.delete(TEST_APP_DIR_PATH, true); + store.stop(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + hdfsCluster.shutdown(); + FileContext fileContext = FileContext.getLocalFSFileContext(); + fileContext.delete(new Path( + config.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)), true); + } + + @Test + public void testAppLogsScanLogs() throws Exception { + EntityGroupFSTimelineStore.AppLogs appLogs = + store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH, + AppState.COMPLETED); + appLogs.scanForLogs(); + List summaryLogs = appLogs.getSummaryLogs(); + List detailLogs = appLogs.getDetailLogs(); + assertEquals(2, summaryLogs.size()); + assertEquals(1, detailLogs.size()); + + for (LogInfo log : summaryLogs) { + String fileName = log.getFilename(); + assertTrue(fileName.equals(TEST_SUMMARY_LOG_FILE_NAME) + || fileName.equals(TEST_DOMAIN_LOG_FILE_NAME)); + } + + for (LogInfo log : detailLogs) { + String fileName = log.getFilename(); + assertEquals(fileName, TEST_ENTITY_LOG_FILE_NAME); + } + } + + @Test + public void testMoveToDone() throws Exception { + EntityGroupFSTimelineStore.AppLogs appLogs = + store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH, + AppState.COMPLETED); + Path pathBefore = appLogs.getAppDirPath(); + appLogs.moveToDone(); + Path pathAfter = appLogs.getAppDirPath(); + assertNotEquals(pathBefore, pathAfter); + assertTrue(pathAfter.toString().contains(TEST_DONE_DIR_PATH.toString())); + } + + @Test + public void testParseSummaryLogs() throws Exception { + TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config); + EntityGroupFSTimelineStore.AppLogs appLogs = + store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH, + AppState.COMPLETED); + appLogs.scanForLogs(); + appLogs.parseSummaryLogs(tdm); + PluginStoreTestUtils.verifyTestEntities(tdm); + } + + @Test + public void testCleanLogs() throws Exception { + // Create test dirs and files + // Irrelevant file, should not be reclaimed + Path irrelevantFilePath = new Path( + TEST_DONE_DIR_PATH, "irrelevant.log"); + FSDataOutputStream stream = fs.create(irrelevantFilePath); + stream.close(); + // Irrelevant directory, should not be reclaimed + Path irrelevantDirPath = new Path(TEST_DONE_DIR_PATH, "irrelevant"); + fs.mkdirs(irrelevantDirPath); + + Path doneAppHomeDir = new Path(new Path(TEST_DONE_DIR_PATH, "0000"), "001"); + // First application, untouched after creation + Path appDirClean = new Path(doneAppHomeDir, TEST_APP_DIR_NAME); + Path attemptDirClean = new Path(appDirClean, TEST_ATTEMPT_DIR_NAME); + fs.mkdirs(attemptDirClean); + Path filePath = new Path(attemptDirClean, "test.log"); + stream = fs.create(filePath); + stream.close(); + // Second application, one file touched after creation + Path appDirHoldByFile = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "1"); + Path attemptDirHoldByFile + = new Path(appDirHoldByFile, TEST_ATTEMPT_DIR_NAME); + fs.mkdirs(attemptDirHoldByFile); + Path filePathHold = new Path(attemptDirHoldByFile, "test1.log"); + stream = fs.create(filePathHold); + stream.close(); + // Third application, one dir touched after creation + Path appDirHoldByDir = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "2"); + Path attemptDirHoldByDir = new Path(appDirHoldByDir, TEST_ATTEMPT_DIR_NAME); + fs.mkdirs(attemptDirHoldByDir); + Path dirPathHold = new Path(attemptDirHoldByDir, "hold"); + fs.mkdirs(dirPathHold); + // Fourth application, empty dirs + Path appDirEmpty = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "3"); + Path attemptDirEmpty = new Path(appDirEmpty, TEST_ATTEMPT_DIR_NAME); + fs.mkdirs(attemptDirEmpty); + Path dirPathEmpty = new Path(attemptDirEmpty, "empty"); + fs.mkdirs(dirPathEmpty); + + // Should retain all logs after this run + EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 10000); + assertTrue(fs.exists(irrelevantDirPath)); + assertTrue(fs.exists(irrelevantFilePath)); + assertTrue(fs.exists(filePath)); + assertTrue(fs.exists(filePathHold)); + assertTrue(fs.exists(dirPathHold)); + assertTrue(fs.exists(dirPathEmpty)); + + // Make sure the created dir is old enough + Thread.sleep(2000); + // Touch the second application + stream = fs.append(filePathHold); + stream.writeBytes("append"); + stream.close(); + // Touch the third application by creating a new dir + fs.mkdirs(new Path(dirPathHold, "holdByMe")); + + EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 1000); + + // Verification after the second cleaner call + assertTrue(fs.exists(irrelevantDirPath)); + assertTrue(fs.exists(irrelevantFilePath)); + assertTrue(fs.exists(filePathHold)); + assertTrue(fs.exists(dirPathHold)); + assertTrue(fs.exists(doneAppHomeDir)); + + // appDirClean and appDirEmpty should be cleaned up + assertFalse(fs.exists(appDirClean)); + assertFalse(fs.exists(appDirEmpty)); + } + + @Test + public void testPluginRead() throws Exception { + // Verify precondition + assertEquals(EntityGroupPlugInForTest.class.getName(), + store.getConfig().get( + YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES)); + // Load data and cache item, prepare timeline store by making a cache item + EntityGroupFSTimelineStore.AppLogs appLogs = + store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH, + AppState.COMPLETED); + EntityCacheItem cacheItem = new EntityCacheItem(config, fs); + cacheItem.setAppLogs(appLogs); + store.setCachedLogs( + EntityGroupPlugInForTest.getStandardTimelineGroupId(), cacheItem); + // Generate TDM + TimelineDataManager tdm + = PluginStoreTestUtils.getTdmWithStore(config, store); + + // Verify single entity read + TimelineEntity entity3 = tdm.getEntity("type_3", "id_3", + EnumSet.allOf(TimelineReader.Field.class), + UserGroupInformation.getLoginUser()); + assertNotNull(entity3); + assertEquals(entityNew.getStartTime(), entity3.getStartTime()); + // Verify multiple entities read + TimelineEntities entities = tdm.getEntities("type_3", null, null, null, + null, null, null, null, EnumSet.allOf(TimelineReader.Field.class), + UserGroupInformation.getLoginUser()); + assertEquals(entities.getEntities().size(), 1); + for (TimelineEntity entity : entities.getEntities()) { + assertEquals(entityNew.getStartTime(), entity.getStartTime()); + } + } + + @Test + public void testSummaryRead() throws Exception { + // Load data + EntityGroupFSTimelineStore.AppLogs appLogs = + store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH, + AppState.COMPLETED); + TimelineDataManager tdm + = PluginStoreTestUtils.getTdmWithStore(config, store); + appLogs.scanForLogs(); + appLogs.parseSummaryLogs(tdm); + + // Verify single entity read + PluginStoreTestUtils.verifyTestEntities(tdm); + // Verify multiple entities read + TimelineEntities entities = tdm.getEntities("type_1", null, null, null, + null, null, null, null, EnumSet.allOf(TimelineReader.Field.class), + UserGroupInformation.getLoginUser()); + assertEquals(entities.getEntities().size(), 1); + for (TimelineEntity entity : entities.getEntities()) { + assertEquals((Long) 123l, entity.getStartTime()); + } + + } + + private void createTestFiles() throws IOException { + TimelineEntities entities = PluginStoreTestUtils.generateTestEntities(); + PluginStoreTestUtils.writeEntities(entities, + new Path(TEST_ATTEMPT_DIR_PATH, TEST_SUMMARY_LOG_FILE_NAME), fs); + + entityNew = PluginStoreTestUtils + .createEntity("id_3", "type_3", 789l, null, null, + null, null, "domain_id_1"); + TimelineEntities entityList = new TimelineEntities(); + entityList.addEntity(entityNew); + PluginStoreTestUtils.writeEntities(entityList, + new Path(TEST_ATTEMPT_DIR_PATH, TEST_ENTITY_LOG_FILE_NAME), fs); + + FSDataOutputStream out = fs.create( + new Path(TEST_ATTEMPT_DIR_PATH, TEST_DOMAIN_LOG_FILE_NAME)); + out.close(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java new file mode 100644 index 00000000000..fa6fcc7dc8d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.util.MinimalPrettyPrinter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.EnumSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestLogInfo { + + private static final Path TEST_ROOT_DIR = new Path( + System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), + TestLogInfo.class.getSimpleName()); + + private static final String TEST_ATTEMPT_DIR_NAME = "test_app"; + private static final String TEST_ENTITY_FILE_NAME = "test_entity"; + private static final String TEST_DOMAIN_FILE_NAME = "test_domain"; + private static final String TEST_BROKEN_FILE_NAME = "test_broken"; + + private Configuration config = new YarnConfiguration(); + private MiniDFSCluster hdfsCluster; + private FileSystem fs; + private ObjectMapper objMapper; + + private JsonFactory jsonFactory = new JsonFactory(); + private JsonGenerator jsonGenerator; + private FSDataOutputStream outStream = null; + private FSDataOutputStream outStreamDomain = null; + + private TimelineDomain testDomain; + + private static final short FILE_LOG_DIR_PERMISSIONS = 0770; + + @Before + public void setup() throws Exception { + config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString()); + HdfsConfiguration hdfsConfig = new HdfsConfiguration(); + hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build(); + fs = hdfsCluster.getFileSystem(); + Path testAppDirPath = new Path(TEST_ROOT_DIR, TEST_ATTEMPT_DIR_NAME); + fs.mkdirs(testAppDirPath, new FsPermission(FILE_LOG_DIR_PERMISSIONS)); + objMapper = PluginStoreTestUtils.createObjectMapper(); + + TimelineEntities testEntities = PluginStoreTestUtils.generateTestEntities(); + writeEntitiesLeaveOpen(testEntities, + new Path(testAppDirPath, TEST_ENTITY_FILE_NAME)); + + testDomain = new TimelineDomain(); + testDomain.setId("domain_1"); + testDomain.setReaders(UserGroupInformation.getLoginUser().getUserName()); + testDomain.setOwner(UserGroupInformation.getLoginUser().getUserName()); + testDomain.setDescription("description"); + writeDomainLeaveOpen(testDomain, + new Path(testAppDirPath, TEST_DOMAIN_FILE_NAME)); + + writeBrokenFile(new Path(testAppDirPath, TEST_BROKEN_FILE_NAME)); + } + + @After + public void tearDown() throws Exception { + jsonGenerator.close(); + outStream.close(); + outStreamDomain.close(); + hdfsCluster.shutdown(); + } + + @Test + public void testMatchesGroupId() throws Exception { + String testGroupId = "app1_group1"; + // Match + EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, + "app1_group1", + UserGroupInformation.getLoginUser().getUserName()); + assertTrue(testLogInfo.matchesGroupId(testGroupId)); + testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, + "test_app1_group1", + UserGroupInformation.getLoginUser().getUserName()); + assertTrue(testLogInfo.matchesGroupId(testGroupId)); + // Unmatch + testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app2_group1", + UserGroupInformation.getLoginUser().getUserName()); + assertFalse(testLogInfo.matchesGroupId(testGroupId)); + testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group2", + UserGroupInformation.getLoginUser().getUserName()); + assertFalse(testLogInfo.matchesGroupId(testGroupId)); + testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group12", + UserGroupInformation.getLoginUser().getUserName()); + assertFalse(testLogInfo.matchesGroupId(testGroupId)); + // Check delimiters + testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group1_2", + UserGroupInformation.getLoginUser().getUserName()); + assertTrue(testLogInfo.matchesGroupId(testGroupId)); + testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group1.dat", + UserGroupInformation.getLoginUser().getUserName()); + assertTrue(testLogInfo.matchesGroupId(testGroupId)); + // Check file names shorter than group id + testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app2", + UserGroupInformation.getLoginUser().getUserName()); + assertFalse(testLogInfo.matchesGroupId(testGroupId)); + } + + @Test + public void testParseEntity() throws Exception { + // Load test data + TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config); + EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, + TEST_ENTITY_FILE_NAME, + UserGroupInformation.getLoginUser().getUserName()); + testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper, + fs); + // Verify for the first batch + PluginStoreTestUtils.verifyTestEntities(tdm); + // Load new data + TimelineEntity entityNew = PluginStoreTestUtils + .createEntity("id_3", "type_3", 789l, null, null, + null, null, "domain_id_1"); + TimelineEntities entityList = new TimelineEntities(); + entityList.addEntity(entityNew); + writeEntitiesLeaveOpen(entityList, + new Path(new Path(TEST_ROOT_DIR, TEST_ATTEMPT_DIR_NAME), + TEST_ENTITY_FILE_NAME)); + testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper, + fs); + // Verify the newly added data + TimelineEntity entity3 = tdm.getEntity(entityNew.getEntityType(), + entityNew.getEntityId(), EnumSet.allOf(TimelineReader.Field.class), + UserGroupInformation.getLoginUser()); + assertNotNull(entity3); + assertEquals("Failed to read out entity new", + entityNew.getStartTime(), entity3.getStartTime()); + tdm.close(); + } + + @Test + public void testParseBrokenEntity() throws Exception { + // Load test data + TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config); + EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, + TEST_BROKEN_FILE_NAME, + UserGroupInformation.getLoginUser().getUserName()); + DomainLogInfo domainLogInfo = new DomainLogInfo(TEST_ATTEMPT_DIR_NAME, + TEST_BROKEN_FILE_NAME, + UserGroupInformation.getLoginUser().getUserName()); + // Try parse, should not fail + testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper, + fs); + domainLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper, + fs); + tdm.close(); + } + + @Test + public void testParseDomain() throws Exception { + // Load test data + TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config); + DomainLogInfo domainLogInfo = new DomainLogInfo(TEST_ATTEMPT_DIR_NAME, + TEST_DOMAIN_FILE_NAME, + UserGroupInformation.getLoginUser().getUserName()); + domainLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper, + fs); + // Verify domain data + TimelineDomain resultDomain = tdm.getDomain("domain_1", + UserGroupInformation.getLoginUser()); + assertNotNull(resultDomain); + assertEquals(testDomain.getReaders(), resultDomain.getReaders()); + assertEquals(testDomain.getOwner(), resultDomain.getOwner()); + assertEquals(testDomain.getDescription(), resultDomain.getDescription()); + } + + private void writeBrokenFile(Path logPath) throws IOException { + FSDataOutputStream out = null; + try { + String broken = "{ broken { [[]} broken"; + out = PluginStoreTestUtils.createLogFile(logPath, fs); + out.write(broken.getBytes(Charset.forName("UTF-8"))); + out.close(); + out = null; + } finally { + if (out != null) { + out.close(); + } + } + } + + // TestLogInfo needs to maintain opened hdfs files so we have to build our own + // write methods + private void writeEntitiesLeaveOpen(TimelineEntities entities, Path logPath) + throws IOException { + if (outStream == null) { + outStream = PluginStoreTestUtils.createLogFile(logPath, fs); + jsonGenerator = (new JsonFactory()).createJsonGenerator(outStream); + jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); + } + for (TimelineEntity entity : entities.getEntities()) { + objMapper.writeValue(jsonGenerator, entity); + } + outStream.hflush(); + } + + private void writeDomainLeaveOpen(TimelineDomain domain, Path logPath) + throws IOException { + if (outStreamDomain == null) { + outStreamDomain = PluginStoreTestUtils.createLogFile(logPath, fs); + } + // Write domain uses its own json generator to isolate from entity writers + JsonGenerator jsonGeneratorLocal + = (new JsonFactory()).createJsonGenerator(outStreamDomain); + jsonGeneratorLocal.setPrettyPrinter(new MinimalPrettyPrinter("\n")); + objMapper.writeValue(jsonGeneratorLocal, domain); + outStreamDomain.hflush(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml index 44631f7438d..d83ac242cec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml @@ -42,5 +42,6 @@ hadoop-yarn-server-sharedcachemanager hadoop-yarn-server-tests hadoop-yarn-server-applicationhistoryservice + hadoop-yarn-server-timeline-pluginstorage