it = SPLITTER.split(timelineEntityGroupIdStr).iterator();
+ if (!it.next().equals(TIMELINE_ENTITY_GROUPID_STR_PREFIX)) {
+ throw new IllegalArgumentException(
+ "Invalid TimelineEntityGroupId prefix: " + timelineEntityGroupIdStr);
+ }
+ ApplicationId appId =
+ ApplicationId.newInstance(Long.parseLong(it.next()),
+ Integer.parseInt(it.next()));
+ buf.append(it.next());
+ while (it.hasNext()) {
+ buf.append("_");
+ buf.append(it.next());
+ }
+ return TimelineEntityGroupId.newInstance(appId, buf.toString());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6ff9fd81fe2..10dfa2eca27 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1581,6 +1581,10 @@ public class YarnConfiguration extends Configuration {
public static final String TIMELINE_SERVICE_UI_WEB_PATH_PREFIX =
TIMELINE_SERVICE_PREFIX + "ui-web-path.";
+ /** Timeline client settings */
+ public static final String TIMELINE_SERVICE_CLIENT_PREFIX =
+ TIMELINE_SERVICE_PREFIX + "client.";
+
/**
* Path to war file or static content directory for this UI
* (For pluggable UIs).
@@ -1588,6 +1592,45 @@ public class YarnConfiguration extends Configuration {
public static final String TIMELINE_SERVICE_UI_ON_DISK_PATH_PREFIX =
TIMELINE_SERVICE_PREFIX + "ui-on-disk-path.";
+ /**
+ * The setting for timeline service v1.5
+ */
+ public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX =
+ TIMELINE_SERVICE_PREFIX + "entity-group-fs-store.";
+
+ public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "active-dir";
+
+ public static final String
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT =
+ "/tmp/entity-file-history/active";
+
+ public static final String
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy-spec";
+ public static final String
+ DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
+ "2000, 500";
+
+ public static final String
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-entity-types";
+
+ public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS =
+ TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs";
+ public static final long
+ TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS_DEFAULT = 10;
+
+ public static final String TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS =
+ TIMELINE_SERVICE_CLIENT_PREFIX + "fd-clean-interval-secs";
+ public static final long
+ TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS_DEFAULT = 60;
+
+ public static final String TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS =
+ TIMELINE_SERVICE_CLIENT_PREFIX + "fd-retain-secs";
+ public static final long TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT =
+ 5*60;
+
// mark app-history related configs @Private as application history is going
// to be integrated into the timeline service
@Private
@@ -1628,8 +1671,8 @@ public class YarnConfiguration extends Configuration {
public static final String FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE =
APPLICATION_HISTORY_PREFIX + "fs-history-store.compression-type";
@Private
- public static final String DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE =
- "none";
+ public static final String
+ DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE = "none";
/** The setting that controls whether timeline service is enabled or not. */
public static final String TIMELINE_SERVICE_ENABLED =
@@ -1678,7 +1721,7 @@ public class YarnConfiguration extends Configuration {
APPLICATION_HISTORY_PREFIX + "max-applications";
public static final long DEFAULT_APPLICATION_HISTORY_MAX_APPS = 10000;
- /** Timeline service store class */
+ /** Timeline service store class. */
public static final String TIMELINE_SERVICE_STORE =
TIMELINE_SERVICE_PREFIX + "store-class";
@@ -1791,10 +1834,6 @@ public class YarnConfiguration extends Configuration {
public static final boolean
TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED_DEFAULT = false;
- /** Timeline client settings */
- public static final String TIMELINE_SERVICE_CLIENT_PREFIX =
- TIMELINE_SERVICE_PREFIX + "client.";
-
/** Timeline client call, max retries (-1 means no limit) */
public static final String TIMELINE_SERVICE_CLIENT_MAX_RETRIES =
TIMELINE_SERVICE_CLIENT_PREFIX + "max-retries";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index a3766f9e69e..258b9f5c326 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -26,8 +26,10 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -78,6 +80,28 @@ public abstract class TimelineClient extends AbstractService {
public abstract TimelinePutResponse putEntities(
TimelineEntity... entities) throws IOException, YarnException;
+ /**
+ *
+ * Send the information of a number of conceptual entities to the timeline
+ * server. It is a blocking API. The method will not return until it gets the
+ * response from the timeline server.
+ *
+ * This API is only for timeline service v1.5
+ *
+ *
+ * @param appAttemptId {@link ApplicationAttemptId}
+ * @param groupId {@link TimelineEntityGroupId}
+ * @param entities
+ * the collection of {@link TimelineEntity}
+ * @return the error information if the sent entities are not correctly stored
+ * @throws IOException
+ * @throws YarnException
+ */
+ @Public
+ public abstract TimelinePutResponse putEntities(
+ ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
+ TimelineEntity... entities) throws IOException, YarnException;
+
/**
*
* Send the information of a domain to the timeline server. It is a
@@ -94,6 +118,25 @@ public abstract class TimelineClient extends AbstractService {
public abstract void putDomain(
TimelineDomain domain) throws IOException, YarnException;
+ /**
+ *
+ * Send the information of a domain to the timeline server. It is a
+ * blocking API. The method will not return until it gets the response from
+ * the timeline server.
+ *
+ * This API is only for timeline service v1.5
+ *
+ *
+ * @param domain
+ * an {@link TimelineDomain} object
+ * @param appAttemptId {@link ApplicationAttemptId}
+ * @throws IOException
+ * @throws YarnException
+ */
+ @Public
+ public abstract void putDomain(ApplicationAttemptId appAttemptId,
+ TimelineDomain domain) throws IOException, YarnException;
+
/**
*
* Get a delegation token so as to be able to talk to the timeline server in a
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/DirectTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/DirectTimelineWriter.java
new file mode 100644
index 00000000000..abc2a280708
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/DirectTimelineWriter.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.sun.jersey.api.client.Client;
+
+/**
+ * A simple writer class for storing Timeline data into Leveldb store.
+ */
+@Private
+@Unstable
+public class DirectTimelineWriter extends TimelineWriter{
+
+ private static final Log LOG = LogFactory
+ .getLog(DirectTimelineWriter.class);
+
+ public DirectTimelineWriter(UserGroupInformation authUgi,
+ Client client, URI resURI) {
+ super(authUgi, client, resURI);
+ }
+
+ @Override
+ public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
+ TimelineEntityGroupId groupId, TimelineEntity... entities)
+ throws IOException, YarnException {
+ throw new IOException("Not supported");
+ }
+
+ @Override
+ public void putDomain(ApplicationAttemptId appAttemptId,
+ TimelineDomain domain) throws IOException, YarnException {
+ throw new IOException("Not supported");
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
new file mode 100644
index 00000000000..1c295e17e39
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
@@ -0,0 +1,847 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig.Feature;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+import org.codehaus.jackson.util.MinimalPrettyPrinter;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import com.sun.jersey.api.client.Client;
+
+/**
+ * A simple writer class for storing Timeline data in any storage that
+ * implements a basic FileSystem interface.
+ * This writer is used for ATSv1.5.
+ */
+@Private
+@Unstable
+public class FileSystemTimelineWriter extends TimelineWriter{
+
+ private static final Log LOG = LogFactory
+ .getLog(FileSystemTimelineWriter.class);
+
+ // This is temporary solution. The configuration will be deleted once we have
+ // the FileSystem API to check whether append operation is supported or not.
+ private static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
+ = YarnConfiguration.TIMELINE_SERVICE_PREFIX
+ + "entity-file.fs-support-append";
+
+ // App log directory must be readable by group so server can access logs
+ // and writable by group so it can be deleted by server
+ private static final short APP_LOG_DIR_PERMISSIONS = 0770;
+ // Logs must be readable by group so server can access them
+ private static final short FILE_LOG_PERMISSIONS = 0640;
+ private static final String DOMAIN_LOG_PREFIX = "domainlog-";
+ private static final String SUMMARY_LOG_PREFIX = "summarylog-";
+ private static final String ENTITY_LOG_PREFIX = "entitylog-";
+
+ private Path activePath = null;
+ private FileSystem fs = null;
+ private Set summaryEntityTypes;
+ private ObjectMapper objMapper = null;
+ private long flushIntervalSecs;
+ private long cleanIntervalSecs;
+ private long ttl;
+ private LogFDsCache logFDsCache = null;
+ private boolean isAppendSupported;
+
+ public FileSystemTimelineWriter(Configuration conf,
+ UserGroupInformation authUgi, Client client, URI resURI)
+ throws IOException {
+ super(authUgi, client, resURI);
+
+ Configuration fsConf = new Configuration(conf);
+ fsConf.setBoolean("dfs.client.retry.policy.enabled", true);
+ String retryPolicy =
+ fsConf.get(YarnConfiguration.
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC,
+ YarnConfiguration.
+ DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC);
+ fsConf.set("dfs.client.retry.policy.spec", retryPolicy);
+
+ activePath = new Path(fsConf.get(
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
+
+ String scheme = activePath.toUri().getScheme();
+ if (scheme == null) {
+ scheme = FileSystem.getDefaultUri(fsConf).getScheme();
+ }
+ if (scheme != null) {
+ String disableCacheName = String.format("fs.%s.impl.disable.cache",
+ scheme);
+ fsConf.setBoolean(disableCacheName, true);
+ }
+
+ fs = activePath.getFileSystem(fsConf);
+ if (!fs.exists(activePath)) {
+ throw new IOException(activePath + " does not exist");
+ }
+
+ summaryEntityTypes = new HashSet(
+ conf.getStringCollection(YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES));
+
+ flushIntervalSecs = conf.getLong(
+ YarnConfiguration
+ .TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS,
+ YarnConfiguration
+ .TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS_DEFAULT);
+
+ cleanIntervalSecs = conf.getLong(
+ YarnConfiguration
+ .TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS,
+ YarnConfiguration
+ .TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS_DEFAULT);
+
+ ttl = conf.getLong(
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS,
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT);
+
+ logFDsCache =
+ new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl);
+
+ this.isAppendSupported =
+ conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
+
+ objMapper = createObjectMapper();
+
+ if (LOG.isDebugEnabled()) {
+ StringBuilder debugMSG = new StringBuilder();
+ debugMSG.append(
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS
+ + "=" + flushIntervalSecs + ", " +
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS
+ + "=" + cleanIntervalSecs + ", " +
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS
+ + "=" + ttl + ", " +
+ TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
+ + "=" + isAppendSupported + ", " +
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR
+ + "=" + activePath);
+
+ if (summaryEntityTypes != null && !summaryEntityTypes.isEmpty()) {
+ debugMSG.append(", " + YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES
+ + " = " + summaryEntityTypes);
+ }
+ LOG.debug(debugMSG.toString());
+ }
+ }
+
+ @Override
+ public TimelinePutResponse putEntities(
+ ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
+ TimelineEntity... entities) throws IOException, YarnException {
+ if (appAttemptId == null) {
+ return putEntities(entities);
+ }
+
+ List entitiesToDBStore = new ArrayList();
+ List entitiesToSummaryCache
+ = new ArrayList();
+ List entitiesToEntityCache
+ = new ArrayList();
+ Path attemptDir = createAttemptDir(appAttemptId);
+
+ for (TimelineEntity entity : entities) {
+ if (summaryEntityTypes.contains(entity.getEntityType())) {
+ entitiesToSummaryCache.add(entity);
+ } else {
+ if (groupId != null) {
+ entitiesToEntityCache.add(entity);
+ } else {
+ entitiesToDBStore.add(entity);
+ }
+ }
+ }
+
+ if (!entitiesToSummaryCache.isEmpty()) {
+ Path summaryLogPath =
+ new Path(attemptDir, SUMMARY_LOG_PREFIX + appAttemptId.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing summary log for " + appAttemptId.toString() + " to "
+ + summaryLogPath);
+ }
+ this.logFDsCache.writeSummaryEntityLogs(fs, summaryLogPath, objMapper,
+ appAttemptId, entitiesToSummaryCache, isAppendSupported);
+ }
+
+ if (!entitiesToEntityCache.isEmpty()) {
+ Path entityLogPath =
+ new Path(attemptDir, ENTITY_LOG_PREFIX + groupId.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing entity log for " + groupId.toString() + " to "
+ + entityLogPath);
+ }
+ this.logFDsCache.writeEntityLogs(fs, entityLogPath, objMapper,
+ appAttemptId, groupId, entitiesToEntityCache, isAppendSupported);
+ }
+
+ if (!entitiesToDBStore.isEmpty()) {
+ putEntities(entitiesToDBStore.toArray(
+ new TimelineEntity[entitiesToDBStore.size()]));
+ }
+
+ return new TimelinePutResponse();
+ }
+
+ @Override
+ public void putDomain(ApplicationAttemptId appAttemptId,
+ TimelineDomain domain) throws IOException, YarnException {
+ if (appAttemptId == null) {
+ putDomain(domain);
+ } else {
+ writeDomain(appAttemptId, domain);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (this.logFDsCache != null) {
+ this.logFDsCache.close();
+ }
+ }
+
+ private ObjectMapper createObjectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector());
+ mapper.setSerializationInclusion(Inclusion.NON_NULL);
+ mapper.configure(Feature.CLOSE_CLOSEABLE, false);
+ return mapper;
+ }
+
+ private Path createAttemptDir(ApplicationAttemptId appAttemptId)
+ throws IOException {
+ Path appDir = createApplicationDir(appAttemptId.getApplicationId());
+
+ Path attemptDir = new Path(appDir, appAttemptId.toString());
+ if (!fs.exists(attemptDir)) {
+ FileSystem.mkdirs(fs, attemptDir, new FsPermission(
+ APP_LOG_DIR_PERMISSIONS));
+ }
+ return attemptDir;
+ }
+
+ private Path createApplicationDir(ApplicationId appId) throws IOException {
+ Path appDir =
+ new Path(activePath, appId.toString());
+ if (!fs.exists(appDir)) {
+ FileSystem.mkdirs(fs, appDir, new FsPermission(APP_LOG_DIR_PERMISSIONS));
+ }
+ return appDir;
+ }
+
+ private void writeDomain(ApplicationAttemptId appAttemptId,
+ TimelineDomain domain) throws IOException {
+ Path domainLogPath =
+ new Path(createAttemptDir(appAttemptId), DOMAIN_LOG_PREFIX
+ + appAttemptId.toString());
+ LOG.info("Writing domains for " + appAttemptId.toString() + " to "
+ + domainLogPath);
+ this.logFDsCache.writeDomainLog(
+ fs, domainLogPath, objMapper, domain, isAppendSupported);
+ }
+
+ private static class DomainLogFD extends LogFD {
+ public DomainLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper,
+ boolean isAppendSupported) throws IOException {
+ super(fs, logPath, objMapper, isAppendSupported);
+ }
+
+ public void writeDomain(TimelineDomain domain)
+ throws IOException {
+ getObjectMapper().writeValue(getJsonGenerator(), domain);
+ updateLastModifiedTime(System.currentTimeMillis());
+ }
+ }
+
+ private static class EntityLogFD extends LogFD {
+ public EntityLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper,
+ boolean isAppendSupported) throws IOException {
+ super(fs, logPath, objMapper, isAppendSupported);
+ }
+
+ public void writeEntities(List entities)
+ throws IOException {
+ if (writerClosed()) {
+ prepareForWrite();
+ }
+ for (TimelineEntity entity : entities) {
+ getObjectMapper().writeValue(getJsonGenerator(), entity);
+ }
+ updateLastModifiedTime(System.currentTimeMillis());
+ }
+ }
+
+ private static class LogFD {
+ private FSDataOutputStream stream;
+ private ObjectMapper objMapper;
+ private JsonGenerator jsonGenerator;
+ private long lastModifiedTime;
+ private final boolean isAppendSupported;
+ private final ReentrantLock fdLock = new ReentrantLock();
+ private final FileSystem fs;
+ private final Path logPath;
+
+ public LogFD(FileSystem fs, Path logPath, ObjectMapper objMapper,
+ boolean isAppendSupported) throws IOException {
+ this.fs = fs;
+ this.logPath = logPath;
+ this.isAppendSupported = isAppendSupported;
+ this.objMapper = objMapper;
+ prepareForWrite();
+ }
+
+ public void close() {
+ if (stream != null) {
+ IOUtils.cleanup(LOG, jsonGenerator);
+ IOUtils.cleanup(LOG, stream);
+ stream = null;
+ jsonGenerator = null;
+ }
+ }
+
+ public void flush() throws IOException {
+ if (stream != null) {
+ stream.hflush();
+ }
+ }
+
+ public long getLastModifiedTime() {
+ return this.lastModifiedTime;
+ }
+
+ protected void prepareForWrite() throws IOException{
+ this.stream = createLogFileStream(fs, logPath);
+ this.jsonGenerator = new JsonFactory().createJsonGenerator(stream);
+ this.jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
+ this.lastModifiedTime = System.currentTimeMillis();
+ }
+
+ protected boolean writerClosed() {
+ return stream == null;
+ }
+
+ private FSDataOutputStream createLogFileStream(FileSystem fileSystem,
+ Path logPathToCreate)
+ throws IOException {
+ FSDataOutputStream streamToCreate;
+ if (!isAppendSupported) {
+ logPathToCreate =
+ new Path(logPathToCreate.getParent(),
+ (logPathToCreate.getName() + "_" + System.currentTimeMillis()));
+ }
+ if (!fileSystem.exists(logPathToCreate)) {
+ streamToCreate = fileSystem.create(logPathToCreate, false);
+ fileSystem.setPermission(logPathToCreate,
+ new FsPermission(FILE_LOG_PERMISSIONS));
+ } else {
+ streamToCreate = fileSystem.append(logPathToCreate);
+ }
+ return streamToCreate;
+ }
+
+ public void lock() {
+ this.fdLock.lock();
+ }
+
+ public void unlock() {
+ this.fdLock.unlock();
+ }
+
+ protected JsonGenerator getJsonGenerator() {
+ return jsonGenerator;
+ }
+
+ protected ObjectMapper getObjectMapper() {
+ return objMapper;
+ }
+
+ protected void updateLastModifiedTime(long updatedTime) {
+ this.lastModifiedTime = updatedTime;
+ }
+ }
+
+ private static class LogFDsCache implements Closeable, Flushable{
+ private DomainLogFD domainLogFD;
+ private Map summanyLogFDs;
+ private Map> entityLogFDs;
+ private Timer flushTimer;
+ private FlushTimerTask flushTimerTask;
+ private Timer cleanInActiveFDsTimer;
+ private CleanInActiveFDsTask cleanInActiveFDsTask;
+ private final long ttl;
+ private final ReentrantLock domainFDLocker = new ReentrantLock();
+ private final ReentrantLock summaryTableLocker = new ReentrantLock();
+ private final ReentrantLock entityTableLocker = new ReentrantLock();
+ private final ReentrantLock summaryTableCopyLocker = new ReentrantLock();
+ private final ReentrantLock entityTableCopyLocker = new ReentrantLock();
+ private volatile boolean serviceStopped = false;
+
+ public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs,
+ long ttl) {
+ domainLogFD = null;
+ summanyLogFDs = new HashMap();
+ entityLogFDs = new HashMap>();
+ this.flushTimer =
+ new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer",
+ true);
+ this.flushTimerTask = new FlushTimerTask();
+ this.flushTimer.schedule(flushTimerTask, flushIntervalSecs * 1000,
+ flushIntervalSecs * 1000);
+
+ this.cleanInActiveFDsTimer =
+ new Timer(LogFDsCache.class.getSimpleName() +
+ "cleanInActiveFDsTimer", true);
+ this.cleanInActiveFDsTask = new CleanInActiveFDsTask();
+ this.cleanInActiveFDsTimer.schedule(cleanInActiveFDsTask,
+ cleanIntervalSecs * 1000, cleanIntervalSecs * 1000);
+ this.ttl = ttl * 1000;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ try {
+ this.domainFDLocker.lock();
+ if (domainLogFD != null) {
+ domainLogFD.flush();
+ }
+ } finally {
+ this.domainFDLocker.unlock();
+ }
+
+ flushSummaryFDMap(copySummaryLogFDs(summanyLogFDs));
+
+ flushEntityFDMap(copyEntityLogFDs(entityLogFDs));
+ }
+
+ private Map copySummaryLogFDs(
+ Map summanyLogFDsToCopy) {
+ try {
+ summaryTableCopyLocker.lock();
+ return new HashMap(
+ summanyLogFDsToCopy);
+ } finally {
+ summaryTableCopyLocker.unlock();
+ }
+ }
+
+ private Map> copyEntityLogFDs(Map> entityLogFDsToCopy) {
+ try {
+ entityTableCopyLocker.lock();
+ return new HashMap>(entityLogFDsToCopy);
+ } finally {
+ entityTableCopyLocker.unlock();
+ }
+ }
+
+ private void flushSummaryFDMap(Map logFDs) throws IOException {
+ if (!logFDs.isEmpty()) {
+ for (Entry logFDEntry : logFDs
+ .entrySet()) {
+ EntityLogFD logFD = logFDEntry.getValue();
+ try {
+ logFD.lock();
+ logFD.flush();
+ } finally {
+ logFD.unlock();
+ }
+ }
+ }
+ }
+
+ private void flushEntityFDMap(Map> logFDs) throws IOException {
+ if (!logFDs.isEmpty()) {
+ for (Entry> logFDMapEntry : logFDs.entrySet()) {
+ HashMap logFDMap
+ = logFDMapEntry.getValue();
+ for (Entry logFDEntry
+ : logFDMap.entrySet()) {
+ EntityLogFD logFD = logFDEntry.getValue();
+ try {
+ logFD.lock();
+ logFD.flush();
+ } finally {
+ logFD.unlock();
+ }
+ }
+ }
+ }
+ }
+
+ private class FlushTimerTask extends TimerTask {
+ @Override
+ public void run() {
+ try {
+ flush();
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(e);
+ }
+ }
+ }
+ }
+
+ private void cleanInActiveFDs() {
+ long currentTimeStamp = System.currentTimeMillis();
+ try {
+ this.domainFDLocker.lock();
+ if (domainLogFD != null) {
+ if (currentTimeStamp - domainLogFD.getLastModifiedTime() >= ttl) {
+ domainLogFD.close();
+ domainLogFD = null;
+ }
+ }
+ } finally {
+ this.domainFDLocker.unlock();
+ }
+
+ cleanInActiveSummaryFDsforMap(copySummaryLogFDs(summanyLogFDs),
+ currentTimeStamp);
+
+ cleanInActiveEntityFDsforMap(copyEntityLogFDs(entityLogFDs),
+ currentTimeStamp);
+ }
+
+ private void cleanInActiveSummaryFDsforMap(
+ Map logFDs,
+ long currentTimeStamp) {
+ if (!logFDs.isEmpty()) {
+ for (Entry logFDEntry : logFDs
+ .entrySet()) {
+ EntityLogFD logFD = logFDEntry.getValue();
+ try {
+ logFD.lock();
+ if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
+ logFD.close();
+ }
+ } finally {
+ logFD.unlock();
+ }
+ }
+ }
+ }
+
+ private void cleanInActiveEntityFDsforMap(Map> logFDs,
+ long currentTimeStamp) {
+ if (!logFDs.isEmpty()) {
+ for (Entry> logFDMapEntry
+ : logFDs.entrySet()) {
+ HashMap logFDMap
+ = logFDMapEntry.getValue();
+ for (Entry logFDEntry
+ : logFDMap.entrySet()) {
+ EntityLogFD logFD = logFDEntry.getValue();
+ try {
+ logFD.lock();
+ if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
+ logFD.close();
+ }
+ } finally {
+ logFD.unlock();
+ }
+ }
+ }
+ }
+ }
+
+ private class CleanInActiveFDsTask extends TimerTask {
+ @Override
+ public void run() {
+ try {
+ cleanInActiveFDs();
+ } catch (Exception e) {
+ LOG.warn(e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ serviceStopped = true;
+
+ flushTimer.cancel();
+ cleanInActiveFDsTimer.cancel();
+
+ try {
+ this.domainFDLocker.lock();
+ if (domainLogFD != null) {
+ domainLogFD.close();
+ domainLogFD = null;
+ }
+ } finally {
+ this.domainFDLocker.unlock();
+ }
+
+ closeSummaryFDs(summanyLogFDs);
+
+ closeEntityFDs(entityLogFDs);
+ }
+
+ private void closeEntityFDs(Map> logFDs) {
+ try {
+ entityTableLocker.lock();
+ if (!logFDs.isEmpty()) {
+ for (Entry> logFDMapEntry : logFDs.entrySet()) {
+ HashMap logFDMap
+ = logFDMapEntry.getValue();
+ for (Entry logFDEntry
+ : logFDMap.entrySet()) {
+ EntityLogFD logFD = logFDEntry.getValue();
+ try {
+ logFD.lock();
+ logFD.close();
+ } finally {
+ logFD.unlock();
+ }
+ }
+ }
+ }
+ } finally {
+ entityTableLocker.unlock();
+ }
+ }
+
+ private void closeSummaryFDs(
+ Map logFDs) {
+ try {
+ summaryTableLocker.lock();
+ if (!logFDs.isEmpty()) {
+ for (Entry logFDEntry
+ : logFDs.entrySet()) {
+ EntityLogFD logFD = logFDEntry.getValue();
+ try {
+ logFD.lock();
+ logFD.close();
+ } finally {
+ logFD.unlock();
+ }
+ }
+ }
+ } finally {
+ summaryTableLocker.unlock();
+ }
+ }
+
+ public void writeDomainLog(FileSystem fs, Path logPath,
+ ObjectMapper objMapper, TimelineDomain domain,
+ boolean isAppendSupported) throws IOException {
+ try {
+ this.domainFDLocker.lock();
+ if (this.domainLogFD != null) {
+ this.domainLogFD.writeDomain(domain);
+ } else {
+ this.domainLogFD =
+ new DomainLogFD(fs, logPath, objMapper, isAppendSupported);
+ this.domainLogFD.writeDomain(domain);
+ }
+ } finally {
+ this.domainFDLocker.unlock();
+ }
+ }
+
+ public void writeEntityLogs(FileSystem fs, Path entityLogPath,
+ ObjectMapper objMapper, ApplicationAttemptId appAttemptId,
+ TimelineEntityGroupId groupId, List entitiesToEntity,
+ boolean isAppendSupported) throws IOException{
+ writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId,
+ groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs);
+ }
+
+ private void writeEntityLogs(FileSystem fs, Path logPath,
+ ObjectMapper objMapper, ApplicationAttemptId attemptId,
+ TimelineEntityGroupId groupId, List entities,
+ boolean isAppendSupported, Map> logFDs) throws IOException {
+ HashMaplogMapFD
+ = logFDs.get(attemptId);
+ if (logMapFD != null) {
+ EntityLogFD logFD = logMapFD.get(groupId);
+ if (logFD != null) {
+ try {
+ logFD.lock();
+ if (serviceStopped) {
+ return;
+ }
+ logFD.writeEntities(entities);
+ } finally {
+ logFD.unlock();
+ }
+ } else {
+ createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId,
+ entities, isAppendSupported, logFDs);
+ }
+ } else {
+ createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId,
+ entities, isAppendSupported, logFDs);
+ }
+ }
+
+ private void createEntityFDandWrite(FileSystem fs, Path logPath,
+ ObjectMapper objMapper, ApplicationAttemptId attemptId,
+ TimelineEntityGroupId groupId, List entities,
+ boolean isAppendSupported, Map> logFDs) throws IOException{
+ try {
+ entityTableLocker.lock();
+ if (serviceStopped) {
+ return;
+ }
+ HashMap logFDMap =
+ logFDs.get(attemptId);
+ if (logFDMap == null) {
+ logFDMap = new HashMap();
+ }
+ EntityLogFD logFD = logFDMap.get(groupId);
+ if (logFD == null) {
+ logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
+ }
+ try {
+ logFD.lock();
+ logFD.writeEntities(entities);
+ try {
+ entityTableCopyLocker.lock();
+ logFDMap.put(groupId, logFD);
+ logFDs.put(attemptId, logFDMap);
+ } finally {
+ entityTableCopyLocker.unlock();
+ }
+ } finally {
+ logFD.unlock();
+ }
+ } finally {
+ entityTableLocker.unlock();
+ }
+ }
+
+ public void writeSummaryEntityLogs(FileSystem fs, Path logPath,
+ ObjectMapper objMapper, ApplicationAttemptId attemptId,
+ List entities, boolean isAppendSupported)
+ throws IOException {
+ writeSummmaryEntityLogs(fs, logPath, objMapper, attemptId, entities,
+ isAppendSupported, this.summanyLogFDs);
+ }
+
+ private void writeSummmaryEntityLogs(FileSystem fs, Path logPath,
+ ObjectMapper objMapper, ApplicationAttemptId attemptId,
+ List entities, boolean isAppendSupported,
+ Map logFDs) throws IOException {
+ EntityLogFD logFD = null;
+ logFD = logFDs.get(attemptId);
+ if (logFD != null) {
+ try {
+ logFD.lock();
+ if (serviceStopped) {
+ return;
+ }
+ logFD.writeEntities(entities);
+ } finally {
+ logFD.unlock();
+ }
+ } else {
+ createSummaryFDAndWrite(fs, logPath, objMapper, attemptId, entities,
+ isAppendSupported, logFDs);
+ }
+ }
+
+ private void createSummaryFDAndWrite(FileSystem fs, Path logPath,
+ ObjectMapper objMapper, ApplicationAttemptId attemptId,
+ List entities, boolean isAppendSupported,
+ Map logFDs) throws IOException {
+ try {
+ summaryTableLocker.lock();
+ if (serviceStopped) {
+ return;
+ }
+ EntityLogFD logFD = logFDs.get(attemptId);
+ if (logFD == null) {
+ logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
+ }
+ try {
+ logFD.lock();
+ logFD.writeEntities(entities);
+ try {
+ summaryTableCopyLocker.lock();
+ logFDs.put(attemptId, logFD);
+ } finally {
+ summaryTableCopyLocker.unlock();
+ }
+ } finally {
+ logFD.unlock();
+ }
+ } finally {
+ summaryTableLocker.unlock();
+ }
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index 019c7a5a119..195a6617f90 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -33,8 +33,6 @@ import java.security.PrivilegedExceptionAction;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
-import javax.ws.rs.core.MediaType;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@@ -54,19 +52,19 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthentica
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.codehaus.jackson.map.ObjectMapper;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -74,7 +72,6 @@ import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientRequest;
import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;
@@ -110,6 +107,9 @@ public class TimelineClientImpl extends TimelineClient {
private URI resURI;
private UserGroupInformation authUgi;
private String doAsUser;
+ private Configuration configuration;
+ private float timelineServiceVersion;
+ private TimelineWriter timelineWriter;
@Private
@VisibleForTesting
@@ -254,6 +254,7 @@ public class TimelineClientImpl extends TimelineClient {
}
protected void serviceInit(Configuration conf) throws Exception {
+ this.configuration = conf;
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation realUgi = ugi.getRealUser();
if (realUgi != null) {
@@ -293,58 +294,48 @@ public class TimelineClientImpl extends TimelineClient {
RESOURCE_URI_STR));
}
LOG.info("Timeline service address: " + resURI);
+ timelineServiceVersion =
+ conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
super.serviceInit(conf);
}
+ @Override
+ protected void serviceStart() throws Exception {
+ timelineWriter = createTimelineWriter(
+ configuration, authUgi, client, resURI);
+ }
+
+ protected TimelineWriter createTimelineWriter(Configuration conf,
+ UserGroupInformation ugi, Client webClient, URI uri)
+ throws IOException {
+ if (Float.compare(this.timelineServiceVersion, 1.5f) == 0) {
+ return new FileSystemTimelineWriter(
+ conf, ugi, webClient, uri);
+ } else {
+ return new DirectTimelineWriter(ugi, webClient, uri);
+ }
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (this.timelineWriter != null) {
+ this.timelineWriter.close();
+ }
+ super.serviceStop();
+ }
+
@Override
public TimelinePutResponse putEntities(
TimelineEntity... entities) throws IOException, YarnException {
- TimelineEntities entitiesContainer = new TimelineEntities();
- for (TimelineEntity entity : entities) {
- if (entity.getEntityId() == null || entity.getEntityType() == null) {
- throw new YarnException("Incomplete entity without entity id/type");
- }
- entitiesContainer.addEntity(entity);
- }
- ClientResponse resp = doPosting(entitiesContainer, null);
- return resp.getEntity(TimelinePutResponse.class);
+ return timelineWriter.putEntities(entities);
}
@Override
public void putDomain(TimelineDomain domain) throws IOException,
YarnException {
- doPosting(domain, "domain");
- }
-
- private ClientResponse doPosting(final Object obj, final String path)
- throws IOException, YarnException {
- ClientResponse resp;
- try {
- resp = authUgi.doAs(new PrivilegedExceptionAction() {
- @Override
- public ClientResponse run() throws Exception {
- return doPostingObject(obj, path);
- }
- });
- } catch (UndeclaredThrowableException e) {
- throw new IOException(e.getCause());
- } catch (InterruptedException ie) {
- throw new IOException(ie);
- }
- if (resp == null ||
- resp.getClientResponseStatus() != ClientResponse.Status.OK) {
- String msg =
- "Failed to get the response from the timeline server.";
- LOG.error(msg);
- if (LOG.isDebugEnabled() && resp != null) {
- String output = resp.getEntity(String.class);
- LOG.debug("HTTP error code: " + resp.getStatus()
- + " Server response : \n" + output);
- }
- throw new YarnException(msg);
- }
- return resp;
+ timelineWriter.putDomain(domain);
}
@SuppressWarnings("unchecked")
@@ -470,23 +461,6 @@ public class TimelineClientImpl extends TimelineClient {
return connectionRetry.retryOn(tokenRetryOp);
}
- @Private
- @VisibleForTesting
- public ClientResponse doPostingObject(Object object, String path) {
- WebResource webResource = client.resource(resURI);
- if (path == null) {
- return webResource.accept(MediaType.APPLICATION_JSON)
- .type(MediaType.APPLICATION_JSON)
- .post(ClientResponse.class, object);
- } else if (path.equals("domain")) {
- return webResource.path(path).accept(MediaType.APPLICATION_JSON)
- .type(MediaType.APPLICATION_JSON)
- .put(ClientResponse.class, object);
- } else {
- throw new YarnRuntimeException("Unknown resource type");
- }
- }
-
private class TimelineURLConnectionFactory
implements HttpURLConnectionFactory {
@@ -663,4 +637,34 @@ public class TimelineClientImpl extends TimelineClient {
public UserGroupInformation getUgi() {
return authUgi;
}
+
+ @Override
+ public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
+ TimelineEntityGroupId groupId, TimelineEntity... entities)
+ throws IOException, YarnException {
+ if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
+ throw new YarnException(
+ "This API is not supported under current Timeline Service Version: "
+ + timelineServiceVersion);
+ }
+
+ return timelineWriter.putEntities(appAttemptId, groupId, entities);
+ }
+
+ @Override
+ public void putDomain(ApplicationAttemptId appAttemptId,
+ TimelineDomain domain) throws IOException, YarnException {
+ if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
+ throw new YarnException(
+ "This API is not supported under current Timeline Service Version: "
+ + timelineServiceVersion);
+ }
+ timelineWriter.putDomain(appAttemptId, domain);
+ }
+
+ @Private
+ @VisibleForTesting
+ public void setTimelineWriter(TimelineWriter writer) {
+ this.timelineWriter = writer;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
new file mode 100644
index 00000000000..c616e639365
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+
+/**
+ * Base writer class to write the Timeline data.
+ */
+@Private
+@Unstable
+public abstract class TimelineWriter {
+
+ private static final Log LOG = LogFactory
+ .getLog(TimelineWriter.class);
+
+ private UserGroupInformation authUgi;
+ private Client client;
+ private URI resURI;
+
+ public TimelineWriter(UserGroupInformation authUgi, Client client,
+ URI resURI) {
+ this.authUgi = authUgi;
+ this.client = client;
+ this.resURI = resURI;
+ }
+
+ public void close() throws Exception {
+ // DO NOTHING
+ }
+
+ public TimelinePutResponse putEntities(
+ TimelineEntity... entities) throws IOException, YarnException {
+ TimelineEntities entitiesContainer = new TimelineEntities();
+ for (TimelineEntity entity : entities) {
+ if (entity.getEntityId() == null || entity.getEntityType() == null) {
+ throw new YarnException("Incomplete entity without entity id/type");
+ }
+ entitiesContainer.addEntity(entity);
+ }
+ ClientResponse resp = doPosting(entitiesContainer, null);
+ return resp.getEntity(TimelinePutResponse.class);
+ }
+
+ public void putDomain(TimelineDomain domain) throws IOException,
+ YarnException {
+ doPosting(domain, "domain");
+ }
+
+ public abstract TimelinePutResponse putEntities(
+ ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
+ TimelineEntity... entities) throws IOException, YarnException;
+
+ public abstract void putDomain(ApplicationAttemptId appAttemptId,
+ TimelineDomain domain) throws IOException, YarnException;
+
+ private ClientResponse doPosting(final Object obj, final String path)
+ throws IOException, YarnException {
+ ClientResponse resp;
+ try {
+ resp = authUgi.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public ClientResponse run() throws Exception {
+ return doPostingObject(obj, path);
+ }
+ });
+ } catch (UndeclaredThrowableException e) {
+ throw new IOException(e.getCause());
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ if (resp == null ||
+ resp.getClientResponseStatus() != ClientResponse.Status.OK) {
+ String msg =
+ "Failed to get the response from the timeline server.";
+ LOG.error(msg);
+ if (LOG.isDebugEnabled() && resp != null) {
+ String output = resp.getEntity(String.class);
+ LOG.debug("HTTP error code: " + resp.getStatus()
+ + " Server response : \n" + output);
+ }
+ throw new YarnException(msg);
+ }
+ return resp;
+ }
+
+ @Private
+ @VisibleForTesting
+ public ClientResponse doPostingObject(Object object, String path) {
+ WebResource webResource = client.resource(resURI);
+ if (path == null) {
+ return webResource.accept(MediaType.APPLICATION_JSON)
+ .type(MediaType.APPLICATION_JSON)
+ .post(ClientResponse.class, object);
+ } else if (path.equals("domain")) {
+ return webResource.path(path).accept(MediaType.APPLICATION_JSON)
+ .type(MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class, object);
+ } else {
+ throw new YarnRuntimeException("Unknown resource type");
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestTimelineEntityGroupId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestTimelineEntityGroupId.java
new file mode 100644
index 00000000000..55b149640d3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestTimelineEntityGroupId.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTimelineEntityGroupId {
+
+ @Test
+ public void testTimelineEntityGroupId() {
+ ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
+ ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
+ TimelineEntityGroupId group1 = TimelineEntityGroupId.newInstance(appId1, "1");
+ TimelineEntityGroupId group2 = TimelineEntityGroupId.newInstance(appId1, "2");
+ TimelineEntityGroupId group3 = TimelineEntityGroupId.newInstance(appId2, "1");
+ TimelineEntityGroupId group4 = TimelineEntityGroupId.newInstance(appId1, "1");
+
+ Assert.assertTrue(group1.equals(group4));
+ Assert.assertFalse(group1.equals(group2));
+ Assert.assertFalse(group1.equals(group3));
+
+ Assert.assertTrue(group1.compareTo(group4) == 0);
+ Assert.assertTrue(group1.compareTo(group2) < 0);
+ Assert.assertTrue(group1.compareTo(group3) < 0);
+
+ Assert.assertTrue(group1.hashCode() == group4.hashCode());
+ Assert.assertFalse(group1.hashCode() == group2.hashCode());
+ Assert.assertFalse(group1.hashCode() == group3.hashCode());
+
+ Assert.assertEquals("timelineEntityGroupId_1234_1_1", group1.toString());
+ Assert.assertEquals(TimelineEntityGroupId.fromString("timelineEntityGroupId_1234_1_1"), group1);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
index 4c74c618ac1..39fc8deb3ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
@@ -25,8 +25,11 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.net.ConnectException;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
@@ -37,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
@@ -46,17 +48,20 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
public class TestTimelineClient {
private TimelineClientImpl client;
+ private TimelineWriter spyTimelineWriter;
@Before
public void setup() {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
client = createTimelineClient(conf);
}
@@ -69,7 +74,8 @@ public class TestTimelineClient {
@Test
public void testPostEntities() throws Exception {
- mockEntityClientResponse(client, ClientResponse.Status.OK, false, false);
+ mockEntityClientResponse(spyTimelineWriter, ClientResponse.Status.OK,
+ false, false);
try {
TimelinePutResponse response = client.putEntities(generateEntity());
Assert.assertEquals(0, response.getErrors().size());
@@ -80,7 +86,8 @@ public class TestTimelineClient {
@Test
public void testPostEntitiesWithError() throws Exception {
- mockEntityClientResponse(client, ClientResponse.Status.OK, true, false);
+ mockEntityClientResponse(spyTimelineWriter, ClientResponse.Status.OK, true,
+ false);
try {
TimelinePutResponse response = client.putEntities(generateEntity());
Assert.assertEquals(1, response.getErrors().size());
@@ -106,8 +113,8 @@ public class TestTimelineClient {
@Test
public void testPostEntitiesNoResponse() throws Exception {
- mockEntityClientResponse(
- client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
+ mockEntityClientResponse(spyTimelineWriter,
+ ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
try {
client.putEntities(generateEntity());
Assert.fail("Exception is expected");
@@ -119,7 +126,7 @@ public class TestTimelineClient {
@Test
public void testPostEntitiesConnectionRefused() throws Exception {
- mockEntityClientResponse(client, null, false, true);
+ mockEntityClientResponse(spyTimelineWriter, null, false, true);
try {
client.putEntities(generateEntity());
Assert.fail("RuntimeException is expected");
@@ -130,7 +137,7 @@ public class TestTimelineClient {
@Test
public void testPutDomain() throws Exception {
- mockDomainClientResponse(client, ClientResponse.Status.OK, false);
+ mockDomainClientResponse(spyTimelineWriter, ClientResponse.Status.OK, false);
try {
client.putDomain(generateDomain());
} catch (YarnException e) {
@@ -140,7 +147,8 @@ public class TestTimelineClient {
@Test
public void testPutDomainNoResponse() throws Exception {
- mockDomainClientResponse(client, ClientResponse.Status.FORBIDDEN, false);
+ mockDomainClientResponse(spyTimelineWriter,
+ ClientResponse.Status.FORBIDDEN, false);
try {
client.putDomain(generateDomain());
Assert.fail("Exception is expected");
@@ -152,7 +160,7 @@ public class TestTimelineClient {
@Test
public void testPutDomainConnectionRefused() throws Exception {
- mockDomainClientResponse(client, null, true);
+ mockDomainClientResponse(spyTimelineWriter, null, true);
try {
client.putDomain(generateDomain());
Assert.fail("RuntimeException is expected");
@@ -291,15 +299,16 @@ public class TestTimelineClient {
}
private static ClientResponse mockEntityClientResponse(
- TimelineClientImpl client, ClientResponse.Status status,
+ TimelineWriter spyTimelineWriter, ClientResponse.Status status,
boolean hasError, boolean hasRuntimeError) {
ClientResponse response = mock(ClientResponse.class);
if (hasRuntimeError) {
- doThrow(new ClientHandlerException(new ConnectException())).when(client)
- .doPostingObject(any(TimelineEntities.class), any(String.class));
+ doThrow(new ClientHandlerException(new ConnectException())).when(
+ spyTimelineWriter).doPostingObject(
+ any(TimelineEntities.class), any(String.class));
return response;
}
- doReturn(response).when(client)
+ doReturn(response).when(spyTimelineWriter)
.doPostingObject(any(TimelineEntities.class), any(String.class));
when(response.getClientResponseStatus()).thenReturn(status);
TimelinePutResponse.TimelinePutError error =
@@ -316,15 +325,16 @@ public class TestTimelineClient {
}
private static ClientResponse mockDomainClientResponse(
- TimelineClientImpl client, ClientResponse.Status status,
+ TimelineWriter spyTimelineWriter, ClientResponse.Status status,
boolean hasRuntimeError) {
ClientResponse response = mock(ClientResponse.class);
if (hasRuntimeError) {
- doThrow(new ClientHandlerException(new ConnectException())).when(client)
- .doPostingObject(any(TimelineDomain.class), any(String.class));
+ doThrow(new ClientHandlerException(new ConnectException())).when(
+ spyTimelineWriter).doPostingObject(any(TimelineDomain.class),
+ any(String.class));
return response;
}
- doReturn(response).when(client)
+ doReturn(response).when(spyTimelineWriter)
.doPostingObject(any(TimelineDomain.class), any(String.class));
when(response.getClientResponseStatus()).thenReturn(status);
return response;
@@ -365,10 +375,19 @@ public class TestTimelineClient {
return domain;
}
- private static TimelineClientImpl createTimelineClient(
+ private TimelineClientImpl createTimelineClient(
YarnConfiguration conf) {
- TimelineClientImpl client =
- spy((TimelineClientImpl) TimelineClient.createTimelineClient());
+ TimelineClientImpl client = new TimelineClientImpl() {
+ @Override
+ protected TimelineWriter createTimelineWriter(Configuration conf,
+ UserGroupInformation authUgi, Client client, URI resURI)
+ throws IOException {
+ TimelineWriter timelineWriter =
+ new DirectTimelineWriter(authUgi, client, resURI);
+ spyTimelineWriter = spy(timelineWriter);
+ return spyTimelineWriter;
+ }
+ };
client.init(conf);
client.start();
return client;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java
new file mode 100644
index 00000000000..37eadbfa8dd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.reset;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+
+public class TestTimelineClientForATS1_5 {
+
+ protected static Log LOG = LogFactory
+ .getLog(TestTimelineClientForATS1_5.class);
+
+ private TimelineClientImpl client;
+ private static FileContext localFS;
+ private static File localActiveDir;
+ private TimelineWriter spyTimelineWriter;
+
+ @Before
+ public void setup() throws Exception {
+ localFS = FileContext.getLocalFSFileContext();
+ localActiveDir =
+ new File("target", this.getClass().getSimpleName() + "-activeDir")
+ .getAbsoluteFile();
+ localFS.delete(new Path(localActiveDir.getAbsolutePath()), true);
+ localActiveDir.mkdir();
+ LOG.info("Created activeDir in " + localActiveDir.getAbsolutePath());
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f);
+ conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
+ localActiveDir.getAbsolutePath());
+ conf.set(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
+ "summary_type");
+ client = createTimelineClient(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (client != null) {
+ client.stop();
+ }
+ localFS.delete(new Path(localActiveDir.getAbsolutePath()), true);
+ }
+
+ @Test
+ public void testPostEntities() throws Exception {
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ TimelineEntityGroupId groupId =
+ TimelineEntityGroupId.newInstance(appId, "1");
+ TimelineEntityGroupId groupId2 =
+ TimelineEntityGroupId.newInstance(appId, "2");
+ // Create two entities, includes an entity type and a summary type
+ TimelineEntity[] entities = new TimelineEntity[2];
+ entities[0] = generateEntity("entity_type");
+ entities[1] = generateEntity("summary_type");
+ try {
+ // if attemptid is null, fall back to the original putEntities call, and
+ // save the entity
+ // into configured levelDB store
+ client.putEntities(null, null, entities);
+ verify(spyTimelineWriter, times(1)).putEntities(entities);
+ reset(spyTimelineWriter);
+
+ // if the attemptId is specified, but groupId is given as null, it would
+ // fall back to the original putEntities call if we have the entity type.
+ // the entity which is summary type would be written into FS
+ ApplicationAttemptId attemptId1 =
+ ApplicationAttemptId.newInstance(appId, 1);
+ client.putEntities(attemptId1, null, entities);
+ TimelineEntity[] entityTDB = new TimelineEntity[1];
+ entityTDB[0] = entities[0];
+ verify(spyTimelineWriter, times(1)).putEntities(entityTDB);
+ Assert.assertTrue(localFS.util().exists(
+ new Path(getAppAttemptDir(attemptId1), "summarylog-"
+ + attemptId1.toString())));
+ reset(spyTimelineWriter);
+
+ // if we specified attemptId as well as groupId, it would save the entity
+ // into
+ // FileSystem instead of levelDB store
+ ApplicationAttemptId attemptId2 =
+ ApplicationAttemptId.newInstance(appId, 2);
+ client.putEntities(attemptId2, groupId, entities);
+ client.putEntities(attemptId2, groupId2, entities);
+ verify(spyTimelineWriter, times(0)).putEntities(
+ any(TimelineEntity[].class));
+ Assert.assertTrue(localFS.util().exists(
+ new Path(getAppAttemptDir(attemptId2), "summarylog-"
+ + attemptId2.toString())));
+ Assert.assertTrue(localFS.util().exists(
+ new Path(getAppAttemptDir(attemptId2), "entitylog-"
+ + groupId.toString())));
+ Assert.assertTrue(localFS.util().exists(
+ new Path(getAppAttemptDir(attemptId2), "entitylog-"
+ + groupId2.toString())));
+ reset(spyTimelineWriter);
+ } catch (Exception e) {
+ Assert.fail("Exception is not expected. " + e);
+ }
+ }
+
+ @Test
+ public void testPutDomain() {
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationAttemptId attemptId1 =
+ ApplicationAttemptId.newInstance(appId, 1);
+ try {
+ TimelineDomain domain = generateDomain();
+
+ client.putDomain(null, domain);
+ verify(spyTimelineWriter, times(1)).putDomain(domain);
+ reset(spyTimelineWriter);
+
+ client.putDomain(attemptId1, domain);
+ verify(spyTimelineWriter, times(0)).putDomain(domain);
+ Assert.assertTrue(localFS.util().exists(
+ new Path(getAppAttemptDir(attemptId1), "domainlog-"
+ + attemptId1.toString())));
+ reset(spyTimelineWriter);
+ } catch (Exception e) {
+ Assert.fail("Exception is not expected." + e);
+ }
+ }
+
+ private Path getAppAttemptDir(ApplicationAttemptId appAttemptId) {
+ Path appDir =
+ new Path(localActiveDir.getAbsolutePath(), appAttemptId
+ .getApplicationId().toString());
+ Path attemptDir = new Path(appDir, appAttemptId.toString());
+ return attemptDir;
+ }
+
+ private static TimelineEntity generateEntity(String type) {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setEntityId("entity id");
+ entity.setEntityType(type);
+ entity.setStartTime(System.currentTimeMillis());
+ return entity;
+ }
+
+ private static TimelineDomain generateDomain() {
+ TimelineDomain domain = new TimelineDomain();
+ domain.setId("namesapce id");
+ domain.setDescription("domain description");
+ domain.setOwner("domain owner");
+ domain.setReaders("domain_reader");
+ domain.setWriters("domain_writer");
+ domain.setCreatedTime(0L);
+ domain.setModifiedTime(1L);
+ return domain;
+ }
+
+ private TimelineClientImpl createTimelineClient(YarnConfiguration conf) {
+ TimelineClientImpl client = new TimelineClientImpl() {
+ @Override
+ protected TimelineWriter createTimelineWriter(Configuration conf,
+ UserGroupInformation authUgi, Client client, URI resURI)
+ throws IOException {
+ TimelineWriter timelineWriter =
+ new FileSystemTimelineWriter(conf, authUgi, client, resURI) {
+ public ClientResponse doPostingObject(Object object, String path) {
+ ClientResponse response = mock(ClientResponse.class);
+ when(response.getClientResponseStatus()).thenReturn(
+ ClientResponse.Status.OK);
+ return response;
+ }
+ };
+ spyTimelineWriter = spy(timelineWriter);
+ return spyTimelineWriter;
+ }
+ };
+
+ client.init(conf);
+ client.start();
+ return client;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java
index 5cb1baa37d3..46d5b6b873c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java
@@ -19,15 +19,20 @@
package org.apache.hadoop.yarn.server.timeline.webapp;
import java.io.File;
+import java.io.IOException;
+import java.net.URI;
import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
@@ -39,6 +44,7 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
public class TestTimelineWebServicesWithSSL {
@@ -60,6 +66,7 @@ public class TestTimelineWebServicesWithSSL {
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
MemoryTimelineStore.class, TimelineStore.class);
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, "HTTPS_ONLY");
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
File base = new File(BASEDIR);
FileUtil.fullyDelete(base);
@@ -123,11 +130,17 @@ public class TestTimelineWebServicesWithSSL {
private ClientResponse resp;
@Override
- public ClientResponse doPostingObject(Object obj, String path) {
- resp = super.doPostingObject(obj, path);
- return resp;
+ protected TimelineWriter createTimelineWriter(Configuration conf,
+ UserGroupInformation authUgi, Client client, URI resURI)
+ throws IOException {
+ return new DirectTimelineWriter(authUgi, client, resURI) {
+ @Override
+ public ClientResponse doPostingObject(Object obj, String path) {
+ resp = super.doPostingObject(obj, path);
+ return resp;
+ }
+ };
}
-
}
}
diff --git a/q b/q
new file mode 100644
index 00000000000..15ece5a017c
--- /dev/null
+++ b/q
@@ -0,0 +1,222 @@
+
+ SSUUMMMMAARRYY OOFF LLEESSSS CCOOMMMMAANNDDSS
+
+ Commands marked with * may be preceded by a number, _N.
+ Notes in parentheses indicate the behavior if _N is given.
+
+ h H Display this help.
+ q :q Q :Q ZZ Exit.
+ ---------------------------------------------------------------------------
+
+ MMOOVVIINNGG
+
+ e ^E j ^N CR * Forward one line (or _N lines).
+ y ^Y k ^K ^P * Backward one line (or _N lines).
+ f ^F ^V SPACE * Forward one window (or _N lines).
+ b ^B ESC-v * Backward one window (or _N lines).
+ z * Forward one window (and set window to _N).
+ w * Backward one window (and set window to _N).
+ ESC-SPACE * Forward one window, but don't stop at end-of-file.
+ d ^D * Forward one half-window (and set half-window to _N).
+ u ^U * Backward one half-window (and set half-window to _N).
+ ESC-) RightArrow * Left one half screen width (or _N positions).
+ ESC-( LeftArrow * Right one half screen width (or _N positions).
+ F Forward forever; like "tail -f".
+ r ^R ^L Repaint screen.
+ R Repaint screen, discarding buffered input.
+ ---------------------------------------------------
+ Default "window" is the screen height.
+ Default "half-window" is half of the screen height.
+ ---------------------------------------------------------------------------
+
+ SSEEAARRCCHHIINNGG
+
+ /_p_a_t_t_e_r_n * Search forward for (_N-th) matching line.
+ ?_p_a_t_t_e_r_n * Search backward for (_N-th) matching line.
+ n * Repeat previous search (for _N-th occurrence).
+ N * Repeat previous search in reverse direction.
+ ESC-n * Repeat previous search, spanning files.
+ ESC-N * Repeat previous search, reverse dir. & spanning files.
+ ESC-u Undo (toggle) search highlighting.
+ &_p_a_t_t_e_r_n * Display only matching lines
+ ---------------------------------------------------
+ Search patterns may be modified by one or more of:
+ ^N or ! Search for NON-matching lines.
+ ^E or * Search multiple files (pass thru END OF FILE).
+ ^F or @ Start search at FIRST file (for /) or last file (for ?).
+ ^K Highlight matches, but don't move (KEEP position).
+ ^R Don't use REGULAR EXPRESSIONS.
+ ---------------------------------------------------------------------------
+
+ JJUUMMPPIINNGG
+
+ g < ESC-< * Go to first line in file (or line _N).
+ G > ESC-> * Go to last line in file (or line _N).
+ p % * Go to beginning of file (or _N percent into file).
+ t * Go to the (_N-th) next tag.
+ T * Go to the (_N-th) previous tag.
+ { ( [ * Find close bracket } ) ].
+ } ) ] * Find open bracket { ( [.
+ ESC-^F _<_c_1_> _<_c_2_> * Find close bracket _<_c_2_>.
+ ESC-^B _<_c_1_> _<_c_2_> * Find open bracket _<_c_1_>
+ ---------------------------------------------------
+ Each "find close bracket" command goes forward to the close bracket
+ matching the (_N-th) open bracket in the top line.
+ Each "find open bracket" command goes backward to the open bracket
+ matching the (_N-th) close bracket in the bottom line.
+
+ m_<_l_e_t_t_e_r_> Mark the current position with .
+ '_<_l_e_t_t_e_r_> Go to a previously marked position.
+ '' Go to the previous position.
+ ^X^X Same as '.
+ ---------------------------------------------------
+ A mark is any upper-case or lower-case letter.
+ Certain marks are predefined:
+ ^ means beginning of the file
+ $ means end of the file
+ ---------------------------------------------------------------------------
+
+ CCHHAANNGGIINNGG FFIILLEESS
+
+ :e [_f_i_l_e] Examine a new file.
+ ^X^V Same as :e.
+ :n * Examine the (_N-th) next file from the command line.
+ :p * Examine the (_N-th) previous file from the command line.
+ :x * Examine the first (or _N-th) file from the command line.
+ :d Delete the current file from the command line list.
+ = ^G :f Print current file name.
+ ---------------------------------------------------------------------------
+
+ MMIISSCCEELLLLAANNEEOOUUSS CCOOMMMMAANNDDSS
+
+ -_<_f_l_a_g_> Toggle a command line option [see OPTIONS below].
+ --_<_n_a_m_e_> Toggle a command line option, by name.
+ __<_f_l_a_g_> Display the setting of a command line option.
+ ___<_n_a_m_e_> Display the setting of an option, by name.
+ +_c_m_d Execute the less cmd each time a new file is examined.
+
+ !_c_o_m_m_a_n_d Execute the shell command with $SHELL.
+ |XX_c_o_m_m_a_n_d Pipe file between current pos & mark XX to shell command.
+ v Edit the current file with $VISUAL or $EDITOR.
+ V Print version number of "less".
+ ---------------------------------------------------------------------------
+
+ OOPPTTIIOONNSS
+
+ Most options may be changed either on the command line,
+ or from within less by using the - or -- command.
+ Options may be given in one of two forms: either a single
+ character preceded by a -, or a name preceeded by --.
+
+ -? ........ --help
+ Display help (from command line).
+ -a ........ --search-skip-screen
+ Forward search skips current screen.
+ -b [_N] .... --buffers=[_N]
+ Number of buffers.
+ -B ........ --auto-buffers
+ Don't automatically allocate buffers for pipes.
+ -c ........ --clear-screen
+ Repaint by clearing rather than scrolling.
+ -d ........ --dumb
+ Dumb terminal.
+ -D [_x_n_._n] . --color=_x_n_._n
+ Set screen colors. (MS-DOS only)
+ -e -E .... --quit-at-eof --QUIT-AT-EOF
+ Quit at end of file.
+ -f ........ --force
+ Force open non-regular files.
+ -F ........ --quit-if-one-screen
+ Quit if entire file fits on first screen.
+ -g ........ --hilite-search
+ Highlight only last match for searches.
+ -G ........ --HILITE-SEARCH
+ Don't highlight any matches for searches.
+ -h [_N] .... --max-back-scroll=[_N]
+ Backward scroll limit.
+ -i ........ --ignore-case
+ Ignore case in searches that do not contain uppercase.
+ -I ........ --IGNORE-CASE
+ Ignore case in all searches.
+ -j [_N] .... --jump-target=[_N]
+ Screen position of target lines.
+ -J ........ --status-column
+ Display a status column at left edge of screen.
+ -k [_f_i_l_e] . --lesskey-file=[_f_i_l_e]
+ Use a lesskey file.
+ -L ........ --no-lessopen
+ Ignore the LESSOPEN environment variable.
+ -m -M .... --long-prompt --LONG-PROMPT
+ Set prompt style.
+ -n -N .... --line-numbers --LINE-NUMBERS
+ Don't use line numbers.
+ -o [_f_i_l_e] . --log-file=[_f_i_l_e]
+ Copy to log file (standard input only).
+ -O [_f_i_l_e] . --LOG-FILE=[_f_i_l_e]
+ Copy to log file (unconditionally overwrite).
+ -p [_p_a_t_t_e_r_n] --pattern=[_p_a_t_t_e_r_n]
+ Start at pattern (from command line).
+ -P [_p_r_o_m_p_t] --prompt=[_p_r_o_m_p_t]
+ Define new prompt.
+ -q -Q .... --quiet --QUIET --silent --SILENT
+ Quiet the terminal bell.
+ -r -R .... --raw-control-chars --RAW-CONTROL-CHARS
+ Output "raw" control characters.
+ -s ........ --squeeze-blank-lines
+ Squeeze multiple blank lines.
+ -S ........ --chop-long-lines
+ Chop long lines.
+ -t [_t_a_g] .. --tag=[_t_a_g]
+ Find a tag.
+ -T [_t_a_g_s_f_i_l_e] --tag-file=[_t_a_g_s_f_i_l_e]
+ Use an alternate tags file.
+ -u -U .... --underline-special --UNDERLINE-SPECIAL
+ Change handling of backspaces.
+ -V ........ --version
+ Display the version number of "less".
+ -w ........ --hilite-unread
+ Highlight first new line after forward-screen.
+ -W ........ --HILITE-UNREAD
+ Highlight first new line after any forward movement.
+ -x [_N[,...]] --tabs=[_N[,...]]
+ Set tab stops.
+ -X ........ --no-init
+ Don't use termcap init/deinit strings.
+ --no-keypad
+ Don't use termcap keypad init/deinit strings.
+ -y [_N] .... --max-forw-scroll=[_N]
+ Forward scroll limit.
+ -z [_N] .... --window=[_N]
+ Set size of window.
+ -" [_c[_c]] . --quotes=[_c[_c]]
+ Set shell quote characters.
+ -~ ........ --tilde
+ Don't display tildes after end of file.
+ -# [_N] .... --shift=[_N]
+ Horizontal scroll amount (0 = one half screen width)
+
+ ---------------------------------------------------------------------------
+
+ LLIINNEE EEDDIITTIINNGG
+
+ These keys can be used to edit text being entered
+ on the "command line" at the bottom of the screen.
+
+ RightArrow ESC-l Move cursor right one character.
+ LeftArrow ESC-h Move cursor left one character.
+ CNTL-RightArrow ESC-RightArrow ESC-w Move cursor right one word.
+ CNTL-LeftArrow ESC-LeftArrow ESC-b Move cursor left one word.
+ HOME ESC-0 Move cursor to start of line.
+ END ESC-$ Move cursor to end of line.
+ BACKSPACE Delete char to left of cursor.
+ DELETE ESC-x Delete char under cursor.
+ CNTL-BACKSPACE ESC-BACKSPACE Delete word to left of cursor.
+ CNTL-DELETE ESC-DELETE ESC-X Delete word under cursor.
+ CNTL-U ESC (MS-DOS only) Delete entire line.
+ UpArrow ESC-k Retrieve previous command line.
+ DownArrow ESC-j Retrieve next command line.
+ TAB Complete filename & cycle.
+ SHIFT-TAB ESC-TAB Complete filename & reverse cycle.
+ CNTL-L Complete filename, list all.
+
+