From 6e97a3c9686b847e6047e12b7e53a8316e2bebfa Mon Sep 17 00:00:00 2001 From: Junping Du Date: Wed, 23 Dec 2015 05:26:51 -0800 Subject: [PATCH] YARN-4234. New put APIs in TimelineClient for ats v1.5. Contributed by Xuan Gong. (cherry picked from commit 882f2f04644a13cadb93070d5545f7a4f8691fde) --- hadoop-yarn-project/CHANGES.txt | 3 + .../timeline/TimelineEntityGroupId.java | 163 ++++ .../hadoop/yarn/conf/YarnConfiguration.java | 53 +- .../yarn/client/api/TimelineClient.java | 43 + .../client/api/impl/DirectTimelineWriter.java | 66 ++ .../api/impl/FileSystemTimelineWriter.java | 847 ++++++++++++++++++ .../client/api/impl/TimelineClientImpl.java | 128 +-- .../yarn/client/api/impl/TimelineWriter.java | 142 +++ .../yarn/api/TestTimelineEntityGroupId.java | 52 ++ .../client/api/impl/TestTimelineClient.java | 59 +- .../api/impl/TestTimelineClientForATS1_5.java | 225 +++++ .../TestTimelineWebServicesWithSSL.java | 21 +- q | 222 +++++ 13 files changed, 1931 insertions(+), 93 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineEntityGroupId.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/DirectTimelineWriter.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestTimelineEntityGroupId.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java create mode 100644 q diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b6624e8dd0d..7315ba2befc 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -233,6 +233,9 @@ Release 2.8.0 - UNRELEASED YARN-3458. CPU resource monitoring in Windows. (Inigo Goiri via cnauroth) + YARN-4234. New put APIs in TimelineClient for ats v1.5. (Xuan Gong via + junping_du) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineEntityGroupId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineEntityGroupId.java new file mode 100644 index 00000000000..984e6e2e25a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineEntityGroupId.java @@ -0,0 +1,163 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.api.records.timeline; + +import java.util.Iterator; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import com.google.common.base.Splitter; + +/** + *

TimelineEntityGroupId is an abstract way for + * timeline service users to represent “a group of related timeline data. + * For example, all entities that represents one data flow DAG execution + * can be grouped into one timeline entity group.

+ */ +@Public +@Unstable +public class TimelineEntityGroupId implements + Comparable { + + private static final Splitter SPLITTER = Splitter.on('_').trimResults(); + + private ApplicationId applicationId; + private String id; + + @Private + @Unstable + public static final String TIMELINE_ENTITY_GROUPID_STR_PREFIX = + "timelineEntityGroupId"; + + public TimelineEntityGroupId() { + + } + + public static TimelineEntityGroupId newInstance(ApplicationId applicationId, + String id) { + TimelineEntityGroupId timelineEntityGroupId = + new TimelineEntityGroupId(); + timelineEntityGroupId.setApplicationId(applicationId); + timelineEntityGroupId.setTimelineEntityGroupId(id); + return timelineEntityGroupId; + } + + /** + * Get the ApplicationId of the + * TimelineEntityGroupId. + * + * @return ApplicationId of the + * TimelineEntityGroupId + */ + public ApplicationId getApplicationId() { + return this.applicationId; + } + + public void setApplicationId(ApplicationId appID) { + this.applicationId = appID; + } + + /** + * Get the timelineEntityGroupId. + * + * @return timelineEntityGroupId + */ + public String getTimelineEntityGroupId() { + return this.id; + } + + @Private + @Unstable + protected void setTimelineEntityGroupId(String timelineEntityGroupId) { + this.id = timelineEntityGroupId; + } + + @Override + public int hashCode() { + int result = getTimelineEntityGroupId().hashCode(); + result = 31 * result + getApplicationId().hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + TimelineEntityGroupId otherObject = (TimelineEntityGroupId) obj; + if (!this.getApplicationId().equals(otherObject.getApplicationId())) { + return false; + } + if (!this.getTimelineEntityGroupId().equals( + otherObject.getTimelineEntityGroupId())) { + return false; + } + return true; + } + + @Override + public int compareTo(TimelineEntityGroupId other) { + int compareAppIds = + this.getApplicationId().compareTo(other.getApplicationId()); + if (compareAppIds == 0) { + return this.getTimelineEntityGroupId().compareTo( + other.getTimelineEntityGroupId()); + } else { + return compareAppIds; + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(TIMELINE_ENTITY_GROUPID_STR_PREFIX + "_"); + ApplicationId appId = getApplicationId(); + sb.append(appId.getClusterTimestamp()).append("_"); + sb.append(appId.getId()).append("_"); + sb.append(getTimelineEntityGroupId()); + return sb.toString(); + } + + public static TimelineEntityGroupId + fromString(String timelineEntityGroupIdStr) { + StringBuffer buf = new StringBuffer(); + Iterator it = SPLITTER.split(timelineEntityGroupIdStr).iterator(); + if (!it.next().equals(TIMELINE_ENTITY_GROUPID_STR_PREFIX)) { + throw new IllegalArgumentException( + "Invalid TimelineEntityGroupId prefix: " + timelineEntityGroupIdStr); + } + ApplicationId appId = + ApplicationId.newInstance(Long.parseLong(it.next()), + Integer.parseInt(it.next())); + buf.append(it.next()); + while (it.hasNext()) { + buf.append("_"); + buf.append(it.next()); + } + return TimelineEntityGroupId.newInstance(appId, buf.toString()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1f50d9b40eb..66a8110ff41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1581,6 +1581,10 @@ public class YarnConfiguration extends Configuration { public static final String TIMELINE_SERVICE_UI_WEB_PATH_PREFIX = TIMELINE_SERVICE_PREFIX + "ui-web-path."; + /** Timeline client settings */ + public static final String TIMELINE_SERVICE_CLIENT_PREFIX = + TIMELINE_SERVICE_PREFIX + "client."; + /** * Path to war file or static content directory for this UI * (For pluggable UIs). @@ -1588,6 +1592,45 @@ public class YarnConfiguration extends Configuration { public static final String TIMELINE_SERVICE_UI_ON_DISK_PATH_PREFIX = TIMELINE_SERVICE_PREFIX + "ui-on-disk-path."; + /** + * The setting for timeline service v1.5 + */ + public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX = + TIMELINE_SERVICE_PREFIX + "entity-group-fs-store."; + + public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR = + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "active-dir"; + + public static final String + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT = + "/tmp/entity-file-history/active"; + + public static final String + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC = + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy-spec"; + public static final String + DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC = + "2000, 500"; + + public static final String + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES = + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-entity-types"; + + public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS = + TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs"; + public static final long + TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS_DEFAULT = 10; + + public static final String TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS = + TIMELINE_SERVICE_CLIENT_PREFIX + "fd-clean-interval-secs"; + public static final long + TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS_DEFAULT = 60; + + public static final String TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS = + TIMELINE_SERVICE_CLIENT_PREFIX + "fd-retain-secs"; + public static final long TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT = + 5*60; + // mark app-history related configs @Private as application history is going // to be integrated into the timeline service @Private @@ -1628,8 +1671,8 @@ public class YarnConfiguration extends Configuration { public static final String FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE = APPLICATION_HISTORY_PREFIX + "fs-history-store.compression-type"; @Private - public static final String DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE = - "none"; + public static final String + DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE = "none"; /** The setting that controls whether timeline service is enabled or not. */ public static final String TIMELINE_SERVICE_ENABLED = @@ -1678,7 +1721,7 @@ public class YarnConfiguration extends Configuration { APPLICATION_HISTORY_PREFIX + "max-applications"; public static final long DEFAULT_APPLICATION_HISTORY_MAX_APPS = 10000; - /** Timeline service store class */ + /** Timeline service store class. */ public static final String TIMELINE_SERVICE_STORE = TIMELINE_SERVICE_PREFIX + "store-class"; @@ -1791,10 +1834,6 @@ public class YarnConfiguration extends Configuration { public static final boolean TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED_DEFAULT = false; - /** Timeline client settings */ - public static final String TIMELINE_SERVICE_CLIENT_PREFIX = - TIMELINE_SERVICE_PREFIX + "client."; - /** Timeline client call, max retries (-1 means no limit) */ public static final String TIMELINE_SERVICE_CLIENT_MAX_RETRIES = TIMELINE_SERVICE_CLIENT_PREFIX + "max-retries"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index a3766f9e69e..258b9f5c326 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -26,8 +26,10 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -78,6 +80,28 @@ public abstract class TimelineClient extends AbstractService { public abstract TimelinePutResponse putEntities( TimelineEntity... entities) throws IOException, YarnException; + /** + *

+ * Send the information of a number of conceptual entities to the timeline + * server. It is a blocking API. The method will not return until it gets the + * response from the timeline server. + * + * This API is only for timeline service v1.5 + *

+ * + * @param appAttemptId {@link ApplicationAttemptId} + * @param groupId {@link TimelineEntityGroupId} + * @param entities + * the collection of {@link TimelineEntity} + * @return the error information if the sent entities are not correctly stored + * @throws IOException + * @throws YarnException + */ + @Public + public abstract TimelinePutResponse putEntities( + ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId, + TimelineEntity... entities) throws IOException, YarnException; + /** *

* Send the information of a domain to the timeline server. It is a @@ -94,6 +118,25 @@ public abstract class TimelineClient extends AbstractService { public abstract void putDomain( TimelineDomain domain) throws IOException, YarnException; + /** + *

+ * Send the information of a domain to the timeline server. It is a + * blocking API. The method will not return until it gets the response from + * the timeline server. + * + * This API is only for timeline service v1.5 + *

+ * + * @param domain + * an {@link TimelineDomain} object + * @param appAttemptId {@link ApplicationAttemptId} + * @throws IOException + * @throws YarnException + */ + @Public + public abstract void putDomain(ApplicationAttemptId appAttemptId, + TimelineDomain domain) throws IOException, YarnException; + /** *

* Get a delegation token so as to be able to talk to the timeline server in a diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/DirectTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/DirectTimelineWriter.java new file mode 100644 index 00000000000..abc2a280708 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/DirectTimelineWriter.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.IOException; +import java.net.URI; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import com.sun.jersey.api.client.Client; + +/** + * A simple writer class for storing Timeline data into Leveldb store. + */ +@Private +@Unstable +public class DirectTimelineWriter extends TimelineWriter{ + + private static final Log LOG = LogFactory + .getLog(DirectTimelineWriter.class); + + public DirectTimelineWriter(UserGroupInformation authUgi, + Client client, URI resURI) { + super(authUgi, client, resURI); + } + + @Override + public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId, + TimelineEntityGroupId groupId, TimelineEntity... entities) + throws IOException, YarnException { + throw new IOException("Not supported"); + } + + @Override + public void putDomain(ApplicationAttemptId appAttemptId, + TimelineDomain domain) throws IOException, YarnException { + throw new IOException("Not supported"); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java new file mode 100644 index 00000000000..1c295e17e39 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java @@ -0,0 +1,847 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.Map.Entry; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig.Feature; +import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion; +import org.codehaus.jackson.util.MinimalPrettyPrinter; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; + +import com.sun.jersey.api.client.Client; + +/** + * A simple writer class for storing Timeline data in any storage that + * implements a basic FileSystem interface. + * This writer is used for ATSv1.5. + */ +@Private +@Unstable +public class FileSystemTimelineWriter extends TimelineWriter{ + + private static final Log LOG = LogFactory + .getLog(FileSystemTimelineWriter.class); + + // This is temporary solution. The configuration will be deleted once we have + // the FileSystem API to check whether append operation is supported or not. + private static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND + = YarnConfiguration.TIMELINE_SERVICE_PREFIX + + "entity-file.fs-support-append"; + + // App log directory must be readable by group so server can access logs + // and writable by group so it can be deleted by server + private static final short APP_LOG_DIR_PERMISSIONS = 0770; + // Logs must be readable by group so server can access them + private static final short FILE_LOG_PERMISSIONS = 0640; + private static final String DOMAIN_LOG_PREFIX = "domainlog-"; + private static final String SUMMARY_LOG_PREFIX = "summarylog-"; + private static final String ENTITY_LOG_PREFIX = "entitylog-"; + + private Path activePath = null; + private FileSystem fs = null; + private Set summaryEntityTypes; + private ObjectMapper objMapper = null; + private long flushIntervalSecs; + private long cleanIntervalSecs; + private long ttl; + private LogFDsCache logFDsCache = null; + private boolean isAppendSupported; + + public FileSystemTimelineWriter(Configuration conf, + UserGroupInformation authUgi, Client client, URI resURI) + throws IOException { + super(authUgi, client, resURI); + + Configuration fsConf = new Configuration(conf); + fsConf.setBoolean("dfs.client.retry.policy.enabled", true); + String retryPolicy = + fsConf.get(YarnConfiguration. + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC, + YarnConfiguration. + DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC); + fsConf.set("dfs.client.retry.policy.spec", retryPolicy); + + activePath = new Path(fsConf.get( + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR, + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT)); + + String scheme = activePath.toUri().getScheme(); + if (scheme == null) { + scheme = FileSystem.getDefaultUri(fsConf).getScheme(); + } + if (scheme != null) { + String disableCacheName = String.format("fs.%s.impl.disable.cache", + scheme); + fsConf.setBoolean(disableCacheName, true); + } + + fs = activePath.getFileSystem(fsConf); + if (!fs.exists(activePath)) { + throw new IOException(activePath + " does not exist"); + } + + summaryEntityTypes = new HashSet( + conf.getStringCollection(YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES)); + + flushIntervalSecs = conf.getLong( + YarnConfiguration + .TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS, + YarnConfiguration + .TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS_DEFAULT); + + cleanIntervalSecs = conf.getLong( + YarnConfiguration + .TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS, + YarnConfiguration + .TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS_DEFAULT); + + ttl = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS, + YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT); + + logFDsCache = + new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl); + + this.isAppendSupported = + conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true); + + objMapper = createObjectMapper(); + + if (LOG.isDebugEnabled()) { + StringBuilder debugMSG = new StringBuilder(); + debugMSG.append( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS + + "=" + flushIntervalSecs + ", " + + YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS + + "=" + cleanIntervalSecs + ", " + + YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS + + "=" + ttl + ", " + + TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND + + "=" + isAppendSupported + ", " + + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR + + "=" + activePath); + + if (summaryEntityTypes != null && !summaryEntityTypes.isEmpty()) { + debugMSG.append(", " + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES + + " = " + summaryEntityTypes); + } + LOG.debug(debugMSG.toString()); + } + } + + @Override + public TimelinePutResponse putEntities( + ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId, + TimelineEntity... entities) throws IOException, YarnException { + if (appAttemptId == null) { + return putEntities(entities); + } + + List entitiesToDBStore = new ArrayList(); + List entitiesToSummaryCache + = new ArrayList(); + List entitiesToEntityCache + = new ArrayList(); + Path attemptDir = createAttemptDir(appAttemptId); + + for (TimelineEntity entity : entities) { + if (summaryEntityTypes.contains(entity.getEntityType())) { + entitiesToSummaryCache.add(entity); + } else { + if (groupId != null) { + entitiesToEntityCache.add(entity); + } else { + entitiesToDBStore.add(entity); + } + } + } + + if (!entitiesToSummaryCache.isEmpty()) { + Path summaryLogPath = + new Path(attemptDir, SUMMARY_LOG_PREFIX + appAttemptId.toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("Writing summary log for " + appAttemptId.toString() + " to " + + summaryLogPath); + } + this.logFDsCache.writeSummaryEntityLogs(fs, summaryLogPath, objMapper, + appAttemptId, entitiesToSummaryCache, isAppendSupported); + } + + if (!entitiesToEntityCache.isEmpty()) { + Path entityLogPath = + new Path(attemptDir, ENTITY_LOG_PREFIX + groupId.toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("Writing entity log for " + groupId.toString() + " to " + + entityLogPath); + } + this.logFDsCache.writeEntityLogs(fs, entityLogPath, objMapper, + appAttemptId, groupId, entitiesToEntityCache, isAppendSupported); + } + + if (!entitiesToDBStore.isEmpty()) { + putEntities(entitiesToDBStore.toArray( + new TimelineEntity[entitiesToDBStore.size()])); + } + + return new TimelinePutResponse(); + } + + @Override + public void putDomain(ApplicationAttemptId appAttemptId, + TimelineDomain domain) throws IOException, YarnException { + if (appAttemptId == null) { + putDomain(domain); + } else { + writeDomain(appAttemptId, domain); + } + } + + @Override + public void close() throws Exception { + if (this.logFDsCache != null) { + this.logFDsCache.close(); + } + } + + private ObjectMapper createObjectMapper() { + ObjectMapper mapper = new ObjectMapper(); + mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector()); + mapper.setSerializationInclusion(Inclusion.NON_NULL); + mapper.configure(Feature.CLOSE_CLOSEABLE, false); + return mapper; + } + + private Path createAttemptDir(ApplicationAttemptId appAttemptId) + throws IOException { + Path appDir = createApplicationDir(appAttemptId.getApplicationId()); + + Path attemptDir = new Path(appDir, appAttemptId.toString()); + if (!fs.exists(attemptDir)) { + FileSystem.mkdirs(fs, attemptDir, new FsPermission( + APP_LOG_DIR_PERMISSIONS)); + } + return attemptDir; + } + + private Path createApplicationDir(ApplicationId appId) throws IOException { + Path appDir = + new Path(activePath, appId.toString()); + if (!fs.exists(appDir)) { + FileSystem.mkdirs(fs, appDir, new FsPermission(APP_LOG_DIR_PERMISSIONS)); + } + return appDir; + } + + private void writeDomain(ApplicationAttemptId appAttemptId, + TimelineDomain domain) throws IOException { + Path domainLogPath = + new Path(createAttemptDir(appAttemptId), DOMAIN_LOG_PREFIX + + appAttemptId.toString()); + LOG.info("Writing domains for " + appAttemptId.toString() + " to " + + domainLogPath); + this.logFDsCache.writeDomainLog( + fs, domainLogPath, objMapper, domain, isAppendSupported); + } + + private static class DomainLogFD extends LogFD { + public DomainLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper, + boolean isAppendSupported) throws IOException { + super(fs, logPath, objMapper, isAppendSupported); + } + + public void writeDomain(TimelineDomain domain) + throws IOException { + getObjectMapper().writeValue(getJsonGenerator(), domain); + updateLastModifiedTime(System.currentTimeMillis()); + } + } + + private static class EntityLogFD extends LogFD { + public EntityLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper, + boolean isAppendSupported) throws IOException { + super(fs, logPath, objMapper, isAppendSupported); + } + + public void writeEntities(List entities) + throws IOException { + if (writerClosed()) { + prepareForWrite(); + } + for (TimelineEntity entity : entities) { + getObjectMapper().writeValue(getJsonGenerator(), entity); + } + updateLastModifiedTime(System.currentTimeMillis()); + } + } + + private static class LogFD { + private FSDataOutputStream stream; + private ObjectMapper objMapper; + private JsonGenerator jsonGenerator; + private long lastModifiedTime; + private final boolean isAppendSupported; + private final ReentrantLock fdLock = new ReentrantLock(); + private final FileSystem fs; + private final Path logPath; + + public LogFD(FileSystem fs, Path logPath, ObjectMapper objMapper, + boolean isAppendSupported) throws IOException { + this.fs = fs; + this.logPath = logPath; + this.isAppendSupported = isAppendSupported; + this.objMapper = objMapper; + prepareForWrite(); + } + + public void close() { + if (stream != null) { + IOUtils.cleanup(LOG, jsonGenerator); + IOUtils.cleanup(LOG, stream); + stream = null; + jsonGenerator = null; + } + } + + public void flush() throws IOException { + if (stream != null) { + stream.hflush(); + } + } + + public long getLastModifiedTime() { + return this.lastModifiedTime; + } + + protected void prepareForWrite() throws IOException{ + this.stream = createLogFileStream(fs, logPath); + this.jsonGenerator = new JsonFactory().createJsonGenerator(stream); + this.jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); + this.lastModifiedTime = System.currentTimeMillis(); + } + + protected boolean writerClosed() { + return stream == null; + } + + private FSDataOutputStream createLogFileStream(FileSystem fileSystem, + Path logPathToCreate) + throws IOException { + FSDataOutputStream streamToCreate; + if (!isAppendSupported) { + logPathToCreate = + new Path(logPathToCreate.getParent(), + (logPathToCreate.getName() + "_" + System.currentTimeMillis())); + } + if (!fileSystem.exists(logPathToCreate)) { + streamToCreate = fileSystem.create(logPathToCreate, false); + fileSystem.setPermission(logPathToCreate, + new FsPermission(FILE_LOG_PERMISSIONS)); + } else { + streamToCreate = fileSystem.append(logPathToCreate); + } + return streamToCreate; + } + + public void lock() { + this.fdLock.lock(); + } + + public void unlock() { + this.fdLock.unlock(); + } + + protected JsonGenerator getJsonGenerator() { + return jsonGenerator; + } + + protected ObjectMapper getObjectMapper() { + return objMapper; + } + + protected void updateLastModifiedTime(long updatedTime) { + this.lastModifiedTime = updatedTime; + } + } + + private static class LogFDsCache implements Closeable, Flushable{ + private DomainLogFD domainLogFD; + private Map summanyLogFDs; + private Map> entityLogFDs; + private Timer flushTimer; + private FlushTimerTask flushTimerTask; + private Timer cleanInActiveFDsTimer; + private CleanInActiveFDsTask cleanInActiveFDsTask; + private final long ttl; + private final ReentrantLock domainFDLocker = new ReentrantLock(); + private final ReentrantLock summaryTableLocker = new ReentrantLock(); + private final ReentrantLock entityTableLocker = new ReentrantLock(); + private final ReentrantLock summaryTableCopyLocker = new ReentrantLock(); + private final ReentrantLock entityTableCopyLocker = new ReentrantLock(); + private volatile boolean serviceStopped = false; + + public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs, + long ttl) { + domainLogFD = null; + summanyLogFDs = new HashMap(); + entityLogFDs = new HashMap>(); + this.flushTimer = + new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer", + true); + this.flushTimerTask = new FlushTimerTask(); + this.flushTimer.schedule(flushTimerTask, flushIntervalSecs * 1000, + flushIntervalSecs * 1000); + + this.cleanInActiveFDsTimer = + new Timer(LogFDsCache.class.getSimpleName() + + "cleanInActiveFDsTimer", true); + this.cleanInActiveFDsTask = new CleanInActiveFDsTask(); + this.cleanInActiveFDsTimer.schedule(cleanInActiveFDsTask, + cleanIntervalSecs * 1000, cleanIntervalSecs * 1000); + this.ttl = ttl * 1000; + } + + @Override + public void flush() throws IOException { + try { + this.domainFDLocker.lock(); + if (domainLogFD != null) { + domainLogFD.flush(); + } + } finally { + this.domainFDLocker.unlock(); + } + + flushSummaryFDMap(copySummaryLogFDs(summanyLogFDs)); + + flushEntityFDMap(copyEntityLogFDs(entityLogFDs)); + } + + private Map copySummaryLogFDs( + Map summanyLogFDsToCopy) { + try { + summaryTableCopyLocker.lock(); + return new HashMap( + summanyLogFDsToCopy); + } finally { + summaryTableCopyLocker.unlock(); + } + } + + private Map> copyEntityLogFDs(Map> entityLogFDsToCopy) { + try { + entityTableCopyLocker.lock(); + return new HashMap>(entityLogFDsToCopy); + } finally { + entityTableCopyLocker.unlock(); + } + } + + private void flushSummaryFDMap(Map logFDs) throws IOException { + if (!logFDs.isEmpty()) { + for (Entry logFDEntry : logFDs + .entrySet()) { + EntityLogFD logFD = logFDEntry.getValue(); + try { + logFD.lock(); + logFD.flush(); + } finally { + logFD.unlock(); + } + } + } + } + + private void flushEntityFDMap(Map> logFDs) throws IOException { + if (!logFDs.isEmpty()) { + for (Entry> logFDMapEntry : logFDs.entrySet()) { + HashMap logFDMap + = logFDMapEntry.getValue(); + for (Entry logFDEntry + : logFDMap.entrySet()) { + EntityLogFD logFD = logFDEntry.getValue(); + try { + logFD.lock(); + logFD.flush(); + } finally { + logFD.unlock(); + } + } + } + } + } + + private class FlushTimerTask extends TimerTask { + @Override + public void run() { + try { + flush(); + } catch (Exception e) { + if (LOG.isDebugEnabled()) { + LOG.debug(e); + } + } + } + } + + private void cleanInActiveFDs() { + long currentTimeStamp = System.currentTimeMillis(); + try { + this.domainFDLocker.lock(); + if (domainLogFD != null) { + if (currentTimeStamp - domainLogFD.getLastModifiedTime() >= ttl) { + domainLogFD.close(); + domainLogFD = null; + } + } + } finally { + this.domainFDLocker.unlock(); + } + + cleanInActiveSummaryFDsforMap(copySummaryLogFDs(summanyLogFDs), + currentTimeStamp); + + cleanInActiveEntityFDsforMap(copyEntityLogFDs(entityLogFDs), + currentTimeStamp); + } + + private void cleanInActiveSummaryFDsforMap( + Map logFDs, + long currentTimeStamp) { + if (!logFDs.isEmpty()) { + for (Entry logFDEntry : logFDs + .entrySet()) { + EntityLogFD logFD = logFDEntry.getValue(); + try { + logFD.lock(); + if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) { + logFD.close(); + } + } finally { + logFD.unlock(); + } + } + } + } + + private void cleanInActiveEntityFDsforMap(Map> logFDs, + long currentTimeStamp) { + if (!logFDs.isEmpty()) { + for (Entry> logFDMapEntry + : logFDs.entrySet()) { + HashMap logFDMap + = logFDMapEntry.getValue(); + for (Entry logFDEntry + : logFDMap.entrySet()) { + EntityLogFD logFD = logFDEntry.getValue(); + try { + logFD.lock(); + if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) { + logFD.close(); + } + } finally { + logFD.unlock(); + } + } + } + } + } + + private class CleanInActiveFDsTask extends TimerTask { + @Override + public void run() { + try { + cleanInActiveFDs(); + } catch (Exception e) { + LOG.warn(e); + } + } + } + + @Override + public void close() throws IOException { + + serviceStopped = true; + + flushTimer.cancel(); + cleanInActiveFDsTimer.cancel(); + + try { + this.domainFDLocker.lock(); + if (domainLogFD != null) { + domainLogFD.close(); + domainLogFD = null; + } + } finally { + this.domainFDLocker.unlock(); + } + + closeSummaryFDs(summanyLogFDs); + + closeEntityFDs(entityLogFDs); + } + + private void closeEntityFDs(Map> logFDs) { + try { + entityTableLocker.lock(); + if (!logFDs.isEmpty()) { + for (Entry> logFDMapEntry : logFDs.entrySet()) { + HashMap logFDMap + = logFDMapEntry.getValue(); + for (Entry logFDEntry + : logFDMap.entrySet()) { + EntityLogFD logFD = logFDEntry.getValue(); + try { + logFD.lock(); + logFD.close(); + } finally { + logFD.unlock(); + } + } + } + } + } finally { + entityTableLocker.unlock(); + } + } + + private void closeSummaryFDs( + Map logFDs) { + try { + summaryTableLocker.lock(); + if (!logFDs.isEmpty()) { + for (Entry logFDEntry + : logFDs.entrySet()) { + EntityLogFD logFD = logFDEntry.getValue(); + try { + logFD.lock(); + logFD.close(); + } finally { + logFD.unlock(); + } + } + } + } finally { + summaryTableLocker.unlock(); + } + } + + public void writeDomainLog(FileSystem fs, Path logPath, + ObjectMapper objMapper, TimelineDomain domain, + boolean isAppendSupported) throws IOException { + try { + this.domainFDLocker.lock(); + if (this.domainLogFD != null) { + this.domainLogFD.writeDomain(domain); + } else { + this.domainLogFD = + new DomainLogFD(fs, logPath, objMapper, isAppendSupported); + this.domainLogFD.writeDomain(domain); + } + } finally { + this.domainFDLocker.unlock(); + } + } + + public void writeEntityLogs(FileSystem fs, Path entityLogPath, + ObjectMapper objMapper, ApplicationAttemptId appAttemptId, + TimelineEntityGroupId groupId, List entitiesToEntity, + boolean isAppendSupported) throws IOException{ + writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId, + groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs); + } + + private void writeEntityLogs(FileSystem fs, Path logPath, + ObjectMapper objMapper, ApplicationAttemptId attemptId, + TimelineEntityGroupId groupId, List entities, + boolean isAppendSupported, Map> logFDs) throws IOException { + HashMaplogMapFD + = logFDs.get(attemptId); + if (logMapFD != null) { + EntityLogFD logFD = logMapFD.get(groupId); + if (logFD != null) { + try { + logFD.lock(); + if (serviceStopped) { + return; + } + logFD.writeEntities(entities); + } finally { + logFD.unlock(); + } + } else { + createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId, + entities, isAppendSupported, logFDs); + } + } else { + createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId, + entities, isAppendSupported, logFDs); + } + } + + private void createEntityFDandWrite(FileSystem fs, Path logPath, + ObjectMapper objMapper, ApplicationAttemptId attemptId, + TimelineEntityGroupId groupId, List entities, + boolean isAppendSupported, Map> logFDs) throws IOException{ + try { + entityTableLocker.lock(); + if (serviceStopped) { + return; + } + HashMap logFDMap = + logFDs.get(attemptId); + if (logFDMap == null) { + logFDMap = new HashMap(); + } + EntityLogFD logFD = logFDMap.get(groupId); + if (logFD == null) { + logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported); + } + try { + logFD.lock(); + logFD.writeEntities(entities); + try { + entityTableCopyLocker.lock(); + logFDMap.put(groupId, logFD); + logFDs.put(attemptId, logFDMap); + } finally { + entityTableCopyLocker.unlock(); + } + } finally { + logFD.unlock(); + } + } finally { + entityTableLocker.unlock(); + } + } + + public void writeSummaryEntityLogs(FileSystem fs, Path logPath, + ObjectMapper objMapper, ApplicationAttemptId attemptId, + List entities, boolean isAppendSupported) + throws IOException { + writeSummmaryEntityLogs(fs, logPath, objMapper, attemptId, entities, + isAppendSupported, this.summanyLogFDs); + } + + private void writeSummmaryEntityLogs(FileSystem fs, Path logPath, + ObjectMapper objMapper, ApplicationAttemptId attemptId, + List entities, boolean isAppendSupported, + Map logFDs) throws IOException { + EntityLogFD logFD = null; + logFD = logFDs.get(attemptId); + if (logFD != null) { + try { + logFD.lock(); + if (serviceStopped) { + return; + } + logFD.writeEntities(entities); + } finally { + logFD.unlock(); + } + } else { + createSummaryFDAndWrite(fs, logPath, objMapper, attemptId, entities, + isAppendSupported, logFDs); + } + } + + private void createSummaryFDAndWrite(FileSystem fs, Path logPath, + ObjectMapper objMapper, ApplicationAttemptId attemptId, + List entities, boolean isAppendSupported, + Map logFDs) throws IOException { + try { + summaryTableLocker.lock(); + if (serviceStopped) { + return; + } + EntityLogFD logFD = logFDs.get(attemptId); + if (logFD == null) { + logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported); + } + try { + logFD.lock(); + logFD.writeEntities(entities); + try { + summaryTableCopyLocker.lock(); + logFDs.put(attemptId, logFD); + } finally { + summaryTableCopyLocker.unlock(); + } + } finally { + logFD.unlock(); + } + } finally { + summaryTableLocker.unlock(); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 019c7a5a119..195a6617f90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -33,8 +33,6 @@ import java.security.PrivilegedExceptionAction; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLSocketFactory; -import javax.ws.rs.core.MediaType; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -54,19 +52,19 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthentica import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.codehaus.jackson.map.ObjectMapper; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -74,7 +72,6 @@ import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientRequest; import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.config.ClientConfig; import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.api.client.filter.ClientFilter; @@ -110,6 +107,9 @@ public class TimelineClientImpl extends TimelineClient { private URI resURI; private UserGroupInformation authUgi; private String doAsUser; + private Configuration configuration; + private float timelineServiceVersion; + private TimelineWriter timelineWriter; @Private @VisibleForTesting @@ -254,6 +254,7 @@ public class TimelineClientImpl extends TimelineClient { } protected void serviceInit(Configuration conf) throws Exception { + this.configuration = conf; UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); UserGroupInformation realUgi = ugi.getRealUser(); if (realUgi != null) { @@ -293,58 +294,48 @@ public class TimelineClientImpl extends TimelineClient { RESOURCE_URI_STR)); } LOG.info("Timeline service address: " + resURI); + timelineServiceVersion = + conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION); super.serviceInit(conf); } + @Override + protected void serviceStart() throws Exception { + timelineWriter = createTimelineWriter( + configuration, authUgi, client, resURI); + } + + protected TimelineWriter createTimelineWriter(Configuration conf, + UserGroupInformation ugi, Client webClient, URI uri) + throws IOException { + if (Float.compare(this.timelineServiceVersion, 1.5f) == 0) { + return new FileSystemTimelineWriter( + conf, ugi, webClient, uri); + } else { + return new DirectTimelineWriter(ugi, webClient, uri); + } + } + + @Override + protected void serviceStop() throws Exception { + if (this.timelineWriter != null) { + this.timelineWriter.close(); + } + super.serviceStop(); + } + @Override public TimelinePutResponse putEntities( TimelineEntity... entities) throws IOException, YarnException { - TimelineEntities entitiesContainer = new TimelineEntities(); - for (TimelineEntity entity : entities) { - if (entity.getEntityId() == null || entity.getEntityType() == null) { - throw new YarnException("Incomplete entity without entity id/type"); - } - entitiesContainer.addEntity(entity); - } - ClientResponse resp = doPosting(entitiesContainer, null); - return resp.getEntity(TimelinePutResponse.class); + return timelineWriter.putEntities(entities); } @Override public void putDomain(TimelineDomain domain) throws IOException, YarnException { - doPosting(domain, "domain"); - } - - private ClientResponse doPosting(final Object obj, final String path) - throws IOException, YarnException { - ClientResponse resp; - try { - resp = authUgi.doAs(new PrivilegedExceptionAction() { - @Override - public ClientResponse run() throws Exception { - return doPostingObject(obj, path); - } - }); - } catch (UndeclaredThrowableException e) { - throw new IOException(e.getCause()); - } catch (InterruptedException ie) { - throw new IOException(ie); - } - if (resp == null || - resp.getClientResponseStatus() != ClientResponse.Status.OK) { - String msg = - "Failed to get the response from the timeline server."; - LOG.error(msg); - if (LOG.isDebugEnabled() && resp != null) { - String output = resp.getEntity(String.class); - LOG.debug("HTTP error code: " + resp.getStatus() - + " Server response : \n" + output); - } - throw new YarnException(msg); - } - return resp; + timelineWriter.putDomain(domain); } @SuppressWarnings("unchecked") @@ -470,23 +461,6 @@ public class TimelineClientImpl extends TimelineClient { return connectionRetry.retryOn(tokenRetryOp); } - @Private - @VisibleForTesting - public ClientResponse doPostingObject(Object object, String path) { - WebResource webResource = client.resource(resURI); - if (path == null) { - return webResource.accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON) - .post(ClientResponse.class, object); - } else if (path.equals("domain")) { - return webResource.path(path).accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON) - .put(ClientResponse.class, object); - } else { - throw new YarnRuntimeException("Unknown resource type"); - } - } - private class TimelineURLConnectionFactory implements HttpURLConnectionFactory { @@ -663,4 +637,34 @@ public class TimelineClientImpl extends TimelineClient { public UserGroupInformation getUgi() { return authUgi; } + + @Override + public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId, + TimelineEntityGroupId groupId, TimelineEntity... entities) + throws IOException, YarnException { + if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) { + throw new YarnException( + "This API is not supported under current Timeline Service Version: " + + timelineServiceVersion); + } + + return timelineWriter.putEntities(appAttemptId, groupId, entities); + } + + @Override + public void putDomain(ApplicationAttemptId appAttemptId, + TimelineDomain domain) throws IOException, YarnException { + if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) { + throw new YarnException( + "This API is not supported under current Timeline Service Version: " + + timelineServiceVersion); + } + timelineWriter.putDomain(appAttemptId, domain); + } + + @Private + @VisibleForTesting + public void setTimelineWriter(TimelineWriter writer) { + this.timelineWriter = writer; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java new file mode 100644 index 00000000000..c616e639365 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.URI; +import java.security.PrivilegedExceptionAction; +import javax.ws.rs.core.MediaType; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +import com.google.common.annotations.VisibleForTesting; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; + +/** + * Base writer class to write the Timeline data. + */ +@Private +@Unstable +public abstract class TimelineWriter { + + private static final Log LOG = LogFactory + .getLog(TimelineWriter.class); + + private UserGroupInformation authUgi; + private Client client; + private URI resURI; + + public TimelineWriter(UserGroupInformation authUgi, Client client, + URI resURI) { + this.authUgi = authUgi; + this.client = client; + this.resURI = resURI; + } + + public void close() throws Exception { + // DO NOTHING + } + + public TimelinePutResponse putEntities( + TimelineEntity... entities) throws IOException, YarnException { + TimelineEntities entitiesContainer = new TimelineEntities(); + for (TimelineEntity entity : entities) { + if (entity.getEntityId() == null || entity.getEntityType() == null) { + throw new YarnException("Incomplete entity without entity id/type"); + } + entitiesContainer.addEntity(entity); + } + ClientResponse resp = doPosting(entitiesContainer, null); + return resp.getEntity(TimelinePutResponse.class); + } + + public void putDomain(TimelineDomain domain) throws IOException, + YarnException { + doPosting(domain, "domain"); + } + + public abstract TimelinePutResponse putEntities( + ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId, + TimelineEntity... entities) throws IOException, YarnException; + + public abstract void putDomain(ApplicationAttemptId appAttemptId, + TimelineDomain domain) throws IOException, YarnException; + + private ClientResponse doPosting(final Object obj, final String path) + throws IOException, YarnException { + ClientResponse resp; + try { + resp = authUgi.doAs(new PrivilegedExceptionAction() { + @Override + public ClientResponse run() throws Exception { + return doPostingObject(obj, path); + } + }); + } catch (UndeclaredThrowableException e) { + throw new IOException(e.getCause()); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + if (resp == null || + resp.getClientResponseStatus() != ClientResponse.Status.OK) { + String msg = + "Failed to get the response from the timeline server."; + LOG.error(msg); + if (LOG.isDebugEnabled() && resp != null) { + String output = resp.getEntity(String.class); + LOG.debug("HTTP error code: " + resp.getStatus() + + " Server response : \n" + output); + } + throw new YarnException(msg); + } + return resp; + } + + @Private + @VisibleForTesting + public ClientResponse doPostingObject(Object object, String path) { + WebResource webResource = client.resource(resURI); + if (path == null) { + return webResource.accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON) + .post(ClientResponse.class, object); + } else if (path.equals("domain")) { + return webResource.path(path).accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON) + .put(ClientResponse.class, object); + } else { + throw new YarnRuntimeException("Unknown resource type"); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestTimelineEntityGroupId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestTimelineEntityGroupId.java new file mode 100644 index 00000000000..55b149640d3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestTimelineEntityGroupId.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; +import org.junit.Assert; +import org.junit.Test; + +public class TestTimelineEntityGroupId { + + @Test + public void testTimelineEntityGroupId() { + ApplicationId appId1 = ApplicationId.newInstance(1234, 1); + ApplicationId appId2 = ApplicationId.newInstance(1234, 2); + TimelineEntityGroupId group1 = TimelineEntityGroupId.newInstance(appId1, "1"); + TimelineEntityGroupId group2 = TimelineEntityGroupId.newInstance(appId1, "2"); + TimelineEntityGroupId group3 = TimelineEntityGroupId.newInstance(appId2, "1"); + TimelineEntityGroupId group4 = TimelineEntityGroupId.newInstance(appId1, "1"); + + Assert.assertTrue(group1.equals(group4)); + Assert.assertFalse(group1.equals(group2)); + Assert.assertFalse(group1.equals(group3)); + + Assert.assertTrue(group1.compareTo(group4) == 0); + Assert.assertTrue(group1.compareTo(group2) < 0); + Assert.assertTrue(group1.compareTo(group3) < 0); + + Assert.assertTrue(group1.hashCode() == group4.hashCode()); + Assert.assertFalse(group1.hashCode() == group2.hashCode()); + Assert.assertFalse(group1.hashCode() == group3.hashCode()); + + Assert.assertEquals("timelineEntityGroupId_1234_1_1", group1.toString()); + Assert.assertEquals(TimelineEntityGroupId.fromString("timelineEntityGroupId_1234_1_1"), group1); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java index 4c74c618ac1..39fc8deb3ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java @@ -25,8 +25,11 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import java.io.IOException; import java.net.ConnectException; +import java.net.URI; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; @@ -37,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; @@ -46,17 +48,20 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientResponse; public class TestTimelineClient { private TimelineClientImpl client; + private TimelineWriter spyTimelineWriter; @Before public void setup() { YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); client = createTimelineClient(conf); } @@ -69,7 +74,8 @@ public class TestTimelineClient { @Test public void testPostEntities() throws Exception { - mockEntityClientResponse(client, ClientResponse.Status.OK, false, false); + mockEntityClientResponse(spyTimelineWriter, ClientResponse.Status.OK, + false, false); try { TimelinePutResponse response = client.putEntities(generateEntity()); Assert.assertEquals(0, response.getErrors().size()); @@ -80,7 +86,8 @@ public class TestTimelineClient { @Test public void testPostEntitiesWithError() throws Exception { - mockEntityClientResponse(client, ClientResponse.Status.OK, true, false); + mockEntityClientResponse(spyTimelineWriter, ClientResponse.Status.OK, true, + false); try { TimelinePutResponse response = client.putEntities(generateEntity()); Assert.assertEquals(1, response.getErrors().size()); @@ -106,8 +113,8 @@ public class TestTimelineClient { @Test public void testPostEntitiesNoResponse() throws Exception { - mockEntityClientResponse( - client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false); + mockEntityClientResponse(spyTimelineWriter, + ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false); try { client.putEntities(generateEntity()); Assert.fail("Exception is expected"); @@ -119,7 +126,7 @@ public class TestTimelineClient { @Test public void testPostEntitiesConnectionRefused() throws Exception { - mockEntityClientResponse(client, null, false, true); + mockEntityClientResponse(spyTimelineWriter, null, false, true); try { client.putEntities(generateEntity()); Assert.fail("RuntimeException is expected"); @@ -130,7 +137,7 @@ public class TestTimelineClient { @Test public void testPutDomain() throws Exception { - mockDomainClientResponse(client, ClientResponse.Status.OK, false); + mockDomainClientResponse(spyTimelineWriter, ClientResponse.Status.OK, false); try { client.putDomain(generateDomain()); } catch (YarnException e) { @@ -140,7 +147,8 @@ public class TestTimelineClient { @Test public void testPutDomainNoResponse() throws Exception { - mockDomainClientResponse(client, ClientResponse.Status.FORBIDDEN, false); + mockDomainClientResponse(spyTimelineWriter, + ClientResponse.Status.FORBIDDEN, false); try { client.putDomain(generateDomain()); Assert.fail("Exception is expected"); @@ -152,7 +160,7 @@ public class TestTimelineClient { @Test public void testPutDomainConnectionRefused() throws Exception { - mockDomainClientResponse(client, null, true); + mockDomainClientResponse(spyTimelineWriter, null, true); try { client.putDomain(generateDomain()); Assert.fail("RuntimeException is expected"); @@ -291,15 +299,16 @@ public class TestTimelineClient { } private static ClientResponse mockEntityClientResponse( - TimelineClientImpl client, ClientResponse.Status status, + TimelineWriter spyTimelineWriter, ClientResponse.Status status, boolean hasError, boolean hasRuntimeError) { ClientResponse response = mock(ClientResponse.class); if (hasRuntimeError) { - doThrow(new ClientHandlerException(new ConnectException())).when(client) - .doPostingObject(any(TimelineEntities.class), any(String.class)); + doThrow(new ClientHandlerException(new ConnectException())).when( + spyTimelineWriter).doPostingObject( + any(TimelineEntities.class), any(String.class)); return response; } - doReturn(response).when(client) + doReturn(response).when(spyTimelineWriter) .doPostingObject(any(TimelineEntities.class), any(String.class)); when(response.getClientResponseStatus()).thenReturn(status); TimelinePutResponse.TimelinePutError error = @@ -316,15 +325,16 @@ public class TestTimelineClient { } private static ClientResponse mockDomainClientResponse( - TimelineClientImpl client, ClientResponse.Status status, + TimelineWriter spyTimelineWriter, ClientResponse.Status status, boolean hasRuntimeError) { ClientResponse response = mock(ClientResponse.class); if (hasRuntimeError) { - doThrow(new ClientHandlerException(new ConnectException())).when(client) - .doPostingObject(any(TimelineDomain.class), any(String.class)); + doThrow(new ClientHandlerException(new ConnectException())).when( + spyTimelineWriter).doPostingObject(any(TimelineDomain.class), + any(String.class)); return response; } - doReturn(response).when(client) + doReturn(response).when(spyTimelineWriter) .doPostingObject(any(TimelineDomain.class), any(String.class)); when(response.getClientResponseStatus()).thenReturn(status); return response; @@ -365,10 +375,19 @@ public class TestTimelineClient { return domain; } - private static TimelineClientImpl createTimelineClient( + private TimelineClientImpl createTimelineClient( YarnConfiguration conf) { - TimelineClientImpl client = - spy((TimelineClientImpl) TimelineClient.createTimelineClient()); + TimelineClientImpl client = new TimelineClientImpl() { + @Override + protected TimelineWriter createTimelineWriter(Configuration conf, + UserGroupInformation authUgi, Client client, URI resURI) + throws IOException { + TimelineWriter timelineWriter = + new DirectTimelineWriter(authUgi, client, resURI); + spyTimelineWriter = spy(timelineWriter); + return spyTimelineWriter; + } + }; client.init(conf); client.start(); return client; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java new file mode 100644 index 00000000000..37eadbfa8dd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.reset; + +import java.io.File; +import java.io.IOException; +import java.net.URI; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; + +public class TestTimelineClientForATS1_5 { + + protected static Log LOG = LogFactory + .getLog(TestTimelineClientForATS1_5.class); + + private TimelineClientImpl client; + private static FileContext localFS; + private static File localActiveDir; + private TimelineWriter spyTimelineWriter; + + @Before + public void setup() throws Exception { + localFS = FileContext.getLocalFSFileContext(); + localActiveDir = + new File("target", this.getClass().getSimpleName() + "-activeDir") + .getAbsoluteFile(); + localFS.delete(new Path(localActiveDir.getAbsolutePath()), true); + localActiveDir.mkdir(); + LOG.info("Created activeDir in " + localActiveDir.getAbsolutePath()); + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f); + conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR, + localActiveDir.getAbsolutePath()); + conf.set( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES, + "summary_type"); + client = createTimelineClient(conf); + } + + @After + public void tearDown() throws Exception { + if (client != null) { + client.stop(); + } + localFS.delete(new Path(localActiveDir.getAbsolutePath()), true); + } + + @Test + public void testPostEntities() throws Exception { + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + TimelineEntityGroupId groupId = + TimelineEntityGroupId.newInstance(appId, "1"); + TimelineEntityGroupId groupId2 = + TimelineEntityGroupId.newInstance(appId, "2"); + // Create two entities, includes an entity type and a summary type + TimelineEntity[] entities = new TimelineEntity[2]; + entities[0] = generateEntity("entity_type"); + entities[1] = generateEntity("summary_type"); + try { + // if attemptid is null, fall back to the original putEntities call, and + // save the entity + // into configured levelDB store + client.putEntities(null, null, entities); + verify(spyTimelineWriter, times(1)).putEntities(entities); + reset(spyTimelineWriter); + + // if the attemptId is specified, but groupId is given as null, it would + // fall back to the original putEntities call if we have the entity type. + // the entity which is summary type would be written into FS + ApplicationAttemptId attemptId1 = + ApplicationAttemptId.newInstance(appId, 1); + client.putEntities(attemptId1, null, entities); + TimelineEntity[] entityTDB = new TimelineEntity[1]; + entityTDB[0] = entities[0]; + verify(spyTimelineWriter, times(1)).putEntities(entityTDB); + Assert.assertTrue(localFS.util().exists( + new Path(getAppAttemptDir(attemptId1), "summarylog-" + + attemptId1.toString()))); + reset(spyTimelineWriter); + + // if we specified attemptId as well as groupId, it would save the entity + // into + // FileSystem instead of levelDB store + ApplicationAttemptId attemptId2 = + ApplicationAttemptId.newInstance(appId, 2); + client.putEntities(attemptId2, groupId, entities); + client.putEntities(attemptId2, groupId2, entities); + verify(spyTimelineWriter, times(0)).putEntities( + any(TimelineEntity[].class)); + Assert.assertTrue(localFS.util().exists( + new Path(getAppAttemptDir(attemptId2), "summarylog-" + + attemptId2.toString()))); + Assert.assertTrue(localFS.util().exists( + new Path(getAppAttemptDir(attemptId2), "entitylog-" + + groupId.toString()))); + Assert.assertTrue(localFS.util().exists( + new Path(getAppAttemptDir(attemptId2), "entitylog-" + + groupId2.toString()))); + reset(spyTimelineWriter); + } catch (Exception e) { + Assert.fail("Exception is not expected. " + e); + } + } + + @Test + public void testPutDomain() { + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationAttemptId attemptId1 = + ApplicationAttemptId.newInstance(appId, 1); + try { + TimelineDomain domain = generateDomain(); + + client.putDomain(null, domain); + verify(spyTimelineWriter, times(1)).putDomain(domain); + reset(spyTimelineWriter); + + client.putDomain(attemptId1, domain); + verify(spyTimelineWriter, times(0)).putDomain(domain); + Assert.assertTrue(localFS.util().exists( + new Path(getAppAttemptDir(attemptId1), "domainlog-" + + attemptId1.toString()))); + reset(spyTimelineWriter); + } catch (Exception e) { + Assert.fail("Exception is not expected." + e); + } + } + + private Path getAppAttemptDir(ApplicationAttemptId appAttemptId) { + Path appDir = + new Path(localActiveDir.getAbsolutePath(), appAttemptId + .getApplicationId().toString()); + Path attemptDir = new Path(appDir, appAttemptId.toString()); + return attemptDir; + } + + private static TimelineEntity generateEntity(String type) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId("entity id"); + entity.setEntityType(type); + entity.setStartTime(System.currentTimeMillis()); + return entity; + } + + private static TimelineDomain generateDomain() { + TimelineDomain domain = new TimelineDomain(); + domain.setId("namesapce id"); + domain.setDescription("domain description"); + domain.setOwner("domain owner"); + domain.setReaders("domain_reader"); + domain.setWriters("domain_writer"); + domain.setCreatedTime(0L); + domain.setModifiedTime(1L); + return domain; + } + + private TimelineClientImpl createTimelineClient(YarnConfiguration conf) { + TimelineClientImpl client = new TimelineClientImpl() { + @Override + protected TimelineWriter createTimelineWriter(Configuration conf, + UserGroupInformation authUgi, Client client, URI resURI) + throws IOException { + TimelineWriter timelineWriter = + new FileSystemTimelineWriter(conf, authUgi, client, resURI) { + public ClientResponse doPostingObject(Object object, String path) { + ClientResponse response = mock(ClientResponse.class); + when(response.getClientResponseStatus()).thenReturn( + ClientResponse.Status.OK); + return response; + } + }; + spyTimelineWriter = spy(timelineWriter); + return spyTimelineWriter; + } + }; + + client.init(conf); + client.start(); + return client; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java index 5cb1baa37d3..46d5b6b873c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java @@ -19,15 +19,20 @@ package org.apache.hadoop.yarn.server.timeline.webapp; import java.io.File; +import java.io.IOException; +import java.net.URI; import java.util.EnumSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; +import org.apache.hadoop.yarn.client.api.impl.TimelineWriter; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp; @@ -39,6 +44,7 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; public class TestTimelineWebServicesWithSSL { @@ -60,6 +66,7 @@ public class TestTimelineWebServicesWithSSL { conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, MemoryTimelineStore.class, TimelineStore.class); conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, "HTTPS_ONLY"); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); File base = new File(BASEDIR); FileUtil.fullyDelete(base); @@ -123,11 +130,17 @@ public class TestTimelineWebServicesWithSSL { private ClientResponse resp; @Override - public ClientResponse doPostingObject(Object obj, String path) { - resp = super.doPostingObject(obj, path); - return resp; + protected TimelineWriter createTimelineWriter(Configuration conf, + UserGroupInformation authUgi, Client client, URI resURI) + throws IOException { + return new DirectTimelineWriter(authUgi, client, resURI) { + @Override + public ClientResponse doPostingObject(Object obj, String path) { + resp = super.doPostingObject(obj, path); + return resp; + } + }; } - } } diff --git a/q b/q new file mode 100644 index 00000000000..15ece5a017c --- /dev/null +++ b/q @@ -0,0 +1,222 @@ + + SSUUMMMMAARRYY OOFF LLEESSSS CCOOMMMMAANNDDSS + + Commands marked with * may be preceded by a number, _N. + Notes in parentheses indicate the behavior if _N is given. + + h H Display this help. + q :q Q :Q ZZ Exit. + --------------------------------------------------------------------------- + + MMOOVVIINNGG + + e ^E j ^N CR * Forward one line (or _N lines). + y ^Y k ^K ^P * Backward one line (or _N lines). + f ^F ^V SPACE * Forward one window (or _N lines). + b ^B ESC-v * Backward one window (or _N lines). + z * Forward one window (and set window to _N). + w * Backward one window (and set window to _N). + ESC-SPACE * Forward one window, but don't stop at end-of-file. + d ^D * Forward one half-window (and set half-window to _N). + u ^U * Backward one half-window (and set half-window to _N). + ESC-) RightArrow * Left one half screen width (or _N positions). + ESC-( LeftArrow * Right one half screen width (or _N positions). + F Forward forever; like "tail -f". + r ^R ^L Repaint screen. + R Repaint screen, discarding buffered input. + --------------------------------------------------- + Default "window" is the screen height. + Default "half-window" is half of the screen height. + --------------------------------------------------------------------------- + + SSEEAARRCCHHIINNGG + + /_p_a_t_t_e_r_n * Search forward for (_N-th) matching line. + ?_p_a_t_t_e_r_n * Search backward for (_N-th) matching line. + n * Repeat previous search (for _N-th occurrence). + N * Repeat previous search in reverse direction. + ESC-n * Repeat previous search, spanning files. + ESC-N * Repeat previous search, reverse dir. & spanning files. + ESC-u Undo (toggle) search highlighting. + &_p_a_t_t_e_r_n * Display only matching lines + --------------------------------------------------- + Search patterns may be modified by one or more of: + ^N or ! Search for NON-matching lines. + ^E or * Search multiple files (pass thru END OF FILE). + ^F or @ Start search at FIRST file (for /) or last file (for ?). + ^K Highlight matches, but don't move (KEEP position). + ^R Don't use REGULAR EXPRESSIONS. + --------------------------------------------------------------------------- + + JJUUMMPPIINNGG + + g < ESC-< * Go to first line in file (or line _N). + G > ESC-> * Go to last line in file (or line _N). + p % * Go to beginning of file (or _N percent into file). + t * Go to the (_N-th) next tag. + T * Go to the (_N-th) previous tag. + { ( [ * Find close bracket } ) ]. + } ) ] * Find open bracket { ( [. + ESC-^F _<_c_1_> _<_c_2_> * Find close bracket _<_c_2_>. + ESC-^B _<_c_1_> _<_c_2_> * Find open bracket _<_c_1_> + --------------------------------------------------- + Each "find close bracket" command goes forward to the close bracket + matching the (_N-th) open bracket in the top line. + Each "find open bracket" command goes backward to the open bracket + matching the (_N-th) close bracket in the bottom line. + + m_<_l_e_t_t_e_r_> Mark the current position with . + '_<_l_e_t_t_e_r_> Go to a previously marked position. + '' Go to the previous position. + ^X^X Same as '. + --------------------------------------------------- + A mark is any upper-case or lower-case letter. + Certain marks are predefined: + ^ means beginning of the file + $ means end of the file + --------------------------------------------------------------------------- + + CCHHAANNGGIINNGG FFIILLEESS + + :e [_f_i_l_e] Examine a new file. + ^X^V Same as :e. + :n * Examine the (_N-th) next file from the command line. + :p * Examine the (_N-th) previous file from the command line. + :x * Examine the first (or _N-th) file from the command line. + :d Delete the current file from the command line list. + = ^G :f Print current file name. + --------------------------------------------------------------------------- + + MMIISSCCEELLLLAANNEEOOUUSS CCOOMMMMAANNDDSS + + -_<_f_l_a_g_> Toggle a command line option [see OPTIONS below]. + --_<_n_a_m_e_> Toggle a command line option, by name. + __<_f_l_a_g_> Display the setting of a command line option. + ___<_n_a_m_e_> Display the setting of an option, by name. + +_c_m_d Execute the less cmd each time a new file is examined. + + !_c_o_m_m_a_n_d Execute the shell command with $SHELL. + |XX_c_o_m_m_a_n_d Pipe file between current pos & mark XX to shell command. + v Edit the current file with $VISUAL or $EDITOR. + V Print version number of "less". + --------------------------------------------------------------------------- + + OOPPTTIIOONNSS + + Most options may be changed either on the command line, + or from within less by using the - or -- command. + Options may be given in one of two forms: either a single + character preceded by a -, or a name preceeded by --. + + -? ........ --help + Display help (from command line). + -a ........ --search-skip-screen + Forward search skips current screen. + -b [_N] .... --buffers=[_N] + Number of buffers. + -B ........ --auto-buffers + Don't automatically allocate buffers for pipes. + -c ........ --clear-screen + Repaint by clearing rather than scrolling. + -d ........ --dumb + Dumb terminal. + -D [_x_n_._n] . --color=_x_n_._n + Set screen colors. (MS-DOS only) + -e -E .... --quit-at-eof --QUIT-AT-EOF + Quit at end of file. + -f ........ --force + Force open non-regular files. + -F ........ --quit-if-one-screen + Quit if entire file fits on first screen. + -g ........ --hilite-search + Highlight only last match for searches. + -G ........ --HILITE-SEARCH + Don't highlight any matches for searches. + -h [_N] .... --max-back-scroll=[_N] + Backward scroll limit. + -i ........ --ignore-case + Ignore case in searches that do not contain uppercase. + -I ........ --IGNORE-CASE + Ignore case in all searches. + -j [_N] .... --jump-target=[_N] + Screen position of target lines. + -J ........ --status-column + Display a status column at left edge of screen. + -k [_f_i_l_e] . --lesskey-file=[_f_i_l_e] + Use a lesskey file. + -L ........ --no-lessopen + Ignore the LESSOPEN environment variable. + -m -M .... --long-prompt --LONG-PROMPT + Set prompt style. + -n -N .... --line-numbers --LINE-NUMBERS + Don't use line numbers. + -o [_f_i_l_e] . --log-file=[_f_i_l_e] + Copy to log file (standard input only). + -O [_f_i_l_e] . --LOG-FILE=[_f_i_l_e] + Copy to log file (unconditionally overwrite). + -p [_p_a_t_t_e_r_n] --pattern=[_p_a_t_t_e_r_n] + Start at pattern (from command line). + -P [_p_r_o_m_p_t] --prompt=[_p_r_o_m_p_t] + Define new prompt. + -q -Q .... --quiet --QUIET --silent --SILENT + Quiet the terminal bell. + -r -R .... --raw-control-chars --RAW-CONTROL-CHARS + Output "raw" control characters. + -s ........ --squeeze-blank-lines + Squeeze multiple blank lines. + -S ........ --chop-long-lines + Chop long lines. + -t [_t_a_g] .. --tag=[_t_a_g] + Find a tag. + -T [_t_a_g_s_f_i_l_e] --tag-file=[_t_a_g_s_f_i_l_e] + Use an alternate tags file. + -u -U .... --underline-special --UNDERLINE-SPECIAL + Change handling of backspaces. + -V ........ --version + Display the version number of "less". + -w ........ --hilite-unread + Highlight first new line after forward-screen. + -W ........ --HILITE-UNREAD + Highlight first new line after any forward movement. + -x [_N[,...]] --tabs=[_N[,...]] + Set tab stops. + -X ........ --no-init + Don't use termcap init/deinit strings. + --no-keypad + Don't use termcap keypad init/deinit strings. + -y [_N] .... --max-forw-scroll=[_N] + Forward scroll limit. + -z [_N] .... --window=[_N] + Set size of window. + -" [_c[_c]] . --quotes=[_c[_c]] + Set shell quote characters. + -~ ........ --tilde + Don't display tildes after end of file. + -# [_N] .... --shift=[_N] + Horizontal scroll amount (0 = one half screen width) + + --------------------------------------------------------------------------- + + LLIINNEE EEDDIITTIINNGG + + These keys can be used to edit text being entered + on the "command line" at the bottom of the screen. + + RightArrow ESC-l Move cursor right one character. + LeftArrow ESC-h Move cursor left one character. + CNTL-RightArrow ESC-RightArrow ESC-w Move cursor right one word. + CNTL-LeftArrow ESC-LeftArrow ESC-b Move cursor left one word. + HOME ESC-0 Move cursor to start of line. + END ESC-$ Move cursor to end of line. + BACKSPACE Delete char to left of cursor. + DELETE ESC-x Delete char under cursor. + CNTL-BACKSPACE ESC-BACKSPACE Delete word to left of cursor. + CNTL-DELETE ESC-DELETE ESC-X Delete word under cursor. + CNTL-U ESC (MS-DOS only) Delete entire line. + UpArrow ESC-k Retrieve previous command line. + DownArrow ESC-j Retrieve next command line. + TAB Complete filename & cycle. + SHIFT-TAB ESC-TAB Complete filename & reverse cycle. + CNTL-L Complete filename, list all. + +