YARN-4234. New put APIs in TimelineClient for ats v1.5. Contributed by Xuan Gong.

(cherry picked from commit 882f2f0464)
This commit is contained in:
Junping Du 2015-12-23 05:26:51 -08:00
parent 08ddb536eb
commit 6e97a3c968
13 changed files with 1931 additions and 93 deletions

View File

@ -233,6 +233,9 @@ Release 2.8.0 - UNRELEASED
YARN-3458. CPU resource monitoring in Windows. (Inigo Goiri via cnauroth) YARN-3458. CPU resource monitoring in Windows. (Inigo Goiri via cnauroth)
YARN-4234. New put APIs in TimelineClient for ats v1.5. (Xuan Gong via
junping_du)
IMPROVEMENTS IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.timeline;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import com.google.common.base.Splitter;
/**
* <p><code>TimelineEntityGroupId</code> is an abstract way for
* timeline service users to represent a group of related timeline data.
* For example, all entities that represents one data flow DAG execution
* can be grouped into one timeline entity group. </p>
*/
@Public
@Unstable
public class TimelineEntityGroupId implements
Comparable<TimelineEntityGroupId> {
private static final Splitter SPLITTER = Splitter.on('_').trimResults();
private ApplicationId applicationId;
private String id;
@Private
@Unstable
public static final String TIMELINE_ENTITY_GROUPID_STR_PREFIX =
"timelineEntityGroupId";
public TimelineEntityGroupId() {
}
public static TimelineEntityGroupId newInstance(ApplicationId applicationId,
String id) {
TimelineEntityGroupId timelineEntityGroupId =
new TimelineEntityGroupId();
timelineEntityGroupId.setApplicationId(applicationId);
timelineEntityGroupId.setTimelineEntityGroupId(id);
return timelineEntityGroupId;
}
/**
* Get the <code>ApplicationId</code> of the
* <code>TimelineEntityGroupId</code>.
*
* @return <code>ApplicationId</code> of the
* <code>TimelineEntityGroupId</code>
*/
public ApplicationId getApplicationId() {
return this.applicationId;
}
public void setApplicationId(ApplicationId appID) {
this.applicationId = appID;
}
/**
* Get the <code>timelineEntityGroupId</code>.
*
* @return <code>timelineEntityGroupId</code>
*/
public String getTimelineEntityGroupId() {
return this.id;
}
@Private
@Unstable
protected void setTimelineEntityGroupId(String timelineEntityGroupId) {
this.id = timelineEntityGroupId;
}
@Override
public int hashCode() {
int result = getTimelineEntityGroupId().hashCode();
result = 31 * result + getApplicationId().hashCode();
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
TimelineEntityGroupId otherObject = (TimelineEntityGroupId) obj;
if (!this.getApplicationId().equals(otherObject.getApplicationId())) {
return false;
}
if (!this.getTimelineEntityGroupId().equals(
otherObject.getTimelineEntityGroupId())) {
return false;
}
return true;
}
@Override
public int compareTo(TimelineEntityGroupId other) {
int compareAppIds =
this.getApplicationId().compareTo(other.getApplicationId());
if (compareAppIds == 0) {
return this.getTimelineEntityGroupId().compareTo(
other.getTimelineEntityGroupId());
} else {
return compareAppIds;
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(TIMELINE_ENTITY_GROUPID_STR_PREFIX + "_");
ApplicationId appId = getApplicationId();
sb.append(appId.getClusterTimestamp()).append("_");
sb.append(appId.getId()).append("_");
sb.append(getTimelineEntityGroupId());
return sb.toString();
}
public static TimelineEntityGroupId
fromString(String timelineEntityGroupIdStr) {
StringBuffer buf = new StringBuffer();
Iterator<String> it = SPLITTER.split(timelineEntityGroupIdStr).iterator();
if (!it.next().equals(TIMELINE_ENTITY_GROUPID_STR_PREFIX)) {
throw new IllegalArgumentException(
"Invalid TimelineEntityGroupId prefix: " + timelineEntityGroupIdStr);
}
ApplicationId appId =
ApplicationId.newInstance(Long.parseLong(it.next()),
Integer.parseInt(it.next()));
buf.append(it.next());
while (it.hasNext()) {
buf.append("_");
buf.append(it.next());
}
return TimelineEntityGroupId.newInstance(appId, buf.toString());
}
}

View File

@ -1581,6 +1581,10 @@ private static void addDeprecatedKeys() {
public static final String TIMELINE_SERVICE_UI_WEB_PATH_PREFIX = public static final String TIMELINE_SERVICE_UI_WEB_PATH_PREFIX =
TIMELINE_SERVICE_PREFIX + "ui-web-path."; TIMELINE_SERVICE_PREFIX + "ui-web-path.";
/** Timeline client settings */
public static final String TIMELINE_SERVICE_CLIENT_PREFIX =
TIMELINE_SERVICE_PREFIX + "client.";
/** /**
* Path to war file or static content directory for this UI * Path to war file or static content directory for this UI
* (For pluggable UIs). * (For pluggable UIs).
@ -1588,6 +1592,45 @@ private static void addDeprecatedKeys() {
public static final String TIMELINE_SERVICE_UI_ON_DISK_PATH_PREFIX = public static final String TIMELINE_SERVICE_UI_ON_DISK_PATH_PREFIX =
TIMELINE_SERVICE_PREFIX + "ui-on-disk-path."; TIMELINE_SERVICE_PREFIX + "ui-on-disk-path.";
/**
* The setting for timeline service v1.5
*/
public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX =
TIMELINE_SERVICE_PREFIX + "entity-group-fs-store.";
public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "active-dir";
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT =
"/tmp/entity-file-history/active";
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy-spec";
public static final String
DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
"2000, 500";
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-entity-types";
public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS =
TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs";
public static final long
TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS_DEFAULT = 10;
public static final String TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS =
TIMELINE_SERVICE_CLIENT_PREFIX + "fd-clean-interval-secs";
public static final long
TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS_DEFAULT = 60;
public static final String TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS =
TIMELINE_SERVICE_CLIENT_PREFIX + "fd-retain-secs";
public static final long TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT =
5*60;
// mark app-history related configs @Private as application history is going // mark app-history related configs @Private as application history is going
// to be integrated into the timeline service // to be integrated into the timeline service
@Private @Private
@ -1628,8 +1671,8 @@ private static void addDeprecatedKeys() {
public static final String FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE = public static final String FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE =
APPLICATION_HISTORY_PREFIX + "fs-history-store.compression-type"; APPLICATION_HISTORY_PREFIX + "fs-history-store.compression-type";
@Private @Private
public static final String DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE = public static final String
"none"; DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE = "none";
/** The setting that controls whether timeline service is enabled or not. */ /** The setting that controls whether timeline service is enabled or not. */
public static final String TIMELINE_SERVICE_ENABLED = public static final String TIMELINE_SERVICE_ENABLED =
@ -1678,7 +1721,7 @@ private static void addDeprecatedKeys() {
APPLICATION_HISTORY_PREFIX + "max-applications"; APPLICATION_HISTORY_PREFIX + "max-applications";
public static final long DEFAULT_APPLICATION_HISTORY_MAX_APPS = 10000; public static final long DEFAULT_APPLICATION_HISTORY_MAX_APPS = 10000;
/** Timeline service store class */ /** Timeline service store class. */
public static final String TIMELINE_SERVICE_STORE = public static final String TIMELINE_SERVICE_STORE =
TIMELINE_SERVICE_PREFIX + "store-class"; TIMELINE_SERVICE_PREFIX + "store-class";
@ -1791,10 +1834,6 @@ private static void addDeprecatedKeys() {
public static final boolean public static final boolean
TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED_DEFAULT = false; TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED_DEFAULT = false;
/** Timeline client settings */
public static final String TIMELINE_SERVICE_CLIENT_PREFIX =
TIMELINE_SERVICE_PREFIX + "client.";
/** Timeline client call, max retries (-1 means no limit) */ /** Timeline client call, max retries (-1 means no limit) */
public static final String TIMELINE_SERVICE_CLIENT_MAX_RETRIES = public static final String TIMELINE_SERVICE_CLIENT_MAX_RETRIES =
TIMELINE_SERVICE_CLIENT_PREFIX + "max-retries"; TIMELINE_SERVICE_CLIENT_PREFIX + "max-retries";

View File

@ -26,8 +26,10 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -78,6 +80,28 @@ protected TimelineClient(String name) {
public abstract TimelinePutResponse putEntities( public abstract TimelinePutResponse putEntities(
TimelineEntity... entities) throws IOException, YarnException; TimelineEntity... entities) throws IOException, YarnException;
/**
* <p>
* Send the information of a number of conceptual entities to the timeline
* server. It is a blocking API. The method will not return until it gets the
* response from the timeline server.
*
* This API is only for timeline service v1.5
* </p>
*
* @param appAttemptId {@link ApplicationAttemptId}
* @param groupId {@link TimelineEntityGroupId}
* @param entities
* the collection of {@link TimelineEntity}
* @return the error information if the sent entities are not correctly stored
* @throws IOException
* @throws YarnException
*/
@Public
public abstract TimelinePutResponse putEntities(
ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
TimelineEntity... entities) throws IOException, YarnException;
/** /**
* <p> * <p>
* Send the information of a domain to the timeline server. It is a * Send the information of a domain to the timeline server. It is a
@ -94,6 +118,25 @@ public abstract TimelinePutResponse putEntities(
public abstract void putDomain( public abstract void putDomain(
TimelineDomain domain) throws IOException, YarnException; TimelineDomain domain) throws IOException, YarnException;
/**
* <p>
* Send the information of a domain to the timeline server. It is a
* blocking API. The method will not return until it gets the response from
* the timeline server.
*
* This API is only for timeline service v1.5
* </p>
*
* @param domain
* an {@link TimelineDomain} object
* @param appAttemptId {@link ApplicationAttemptId}
* @throws IOException
* @throws YarnException
*/
@Public
public abstract void putDomain(ApplicationAttemptId appAttemptId,
TimelineDomain domain) throws IOException, YarnException;
/** /**
* <p> * <p>
* Get a delegation token so as to be able to talk to the timeline server in a * Get a delegation token so as to be able to talk to the timeline server in a

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.sun.jersey.api.client.Client;
/**
* A simple writer class for storing Timeline data into Leveldb store.
*/
@Private
@Unstable
public class DirectTimelineWriter extends TimelineWriter{
private static final Log LOG = LogFactory
.getLog(DirectTimelineWriter.class);
public DirectTimelineWriter(UserGroupInformation authUgi,
Client client, URI resURI) {
super(authUgi, client, resURI);
}
@Override
public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
TimelineEntityGroupId groupId, TimelineEntity... entities)
throws IOException, YarnException {
throw new IOException("Not supported");
}
@Override
public void putDomain(ApplicationAttemptId appAttemptId,
TimelineDomain domain) throws IOException, YarnException {
throw new IOException("Not supported");
}
}

View File

@ -0,0 +1,847 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig.Feature;
import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
import com.sun.jersey.api.client.Client;
/**
* A simple writer class for storing Timeline data in any storage that
* implements a basic FileSystem interface.
* This writer is used for ATSv1.5.
*/
@Private
@Unstable
public class FileSystemTimelineWriter extends TimelineWriter{
private static final Log LOG = LogFactory
.getLog(FileSystemTimelineWriter.class);
// This is temporary solution. The configuration will be deleted once we have
// the FileSystem API to check whether append operation is supported or not.
private static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
= YarnConfiguration.TIMELINE_SERVICE_PREFIX
+ "entity-file.fs-support-append";
// App log directory must be readable by group so server can access logs
// and writable by group so it can be deleted by server
private static final short APP_LOG_DIR_PERMISSIONS = 0770;
// Logs must be readable by group so server can access them
private static final short FILE_LOG_PERMISSIONS = 0640;
private static final String DOMAIN_LOG_PREFIX = "domainlog-";
private static final String SUMMARY_LOG_PREFIX = "summarylog-";
private static final String ENTITY_LOG_PREFIX = "entitylog-";
private Path activePath = null;
private FileSystem fs = null;
private Set<String> summaryEntityTypes;
private ObjectMapper objMapper = null;
private long flushIntervalSecs;
private long cleanIntervalSecs;
private long ttl;
private LogFDsCache logFDsCache = null;
private boolean isAppendSupported;
public FileSystemTimelineWriter(Configuration conf,
UserGroupInformation authUgi, Client client, URI resURI)
throws IOException {
super(authUgi, client, resURI);
Configuration fsConf = new Configuration(conf);
fsConf.setBoolean("dfs.client.retry.policy.enabled", true);
String retryPolicy =
fsConf.get(YarnConfiguration.
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC,
YarnConfiguration.
DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC);
fsConf.set("dfs.client.retry.policy.spec", retryPolicy);
activePath = new Path(fsConf.get(
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
String scheme = activePath.toUri().getScheme();
if (scheme == null) {
scheme = FileSystem.getDefaultUri(fsConf).getScheme();
}
if (scheme != null) {
String disableCacheName = String.format("fs.%s.impl.disable.cache",
scheme);
fsConf.setBoolean(disableCacheName, true);
}
fs = activePath.getFileSystem(fsConf);
if (!fs.exists(activePath)) {
throw new IOException(activePath + " does not exist");
}
summaryEntityTypes = new HashSet<String>(
conf.getStringCollection(YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES));
flushIntervalSecs = conf.getLong(
YarnConfiguration
.TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS,
YarnConfiguration
.TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS_DEFAULT);
cleanIntervalSecs = conf.getLong(
YarnConfiguration
.TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS,
YarnConfiguration
.TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS_DEFAULT);
ttl = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS,
YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT);
logFDsCache =
new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl);
this.isAppendSupported =
conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
objMapper = createObjectMapper();
if (LOG.isDebugEnabled()) {
StringBuilder debugMSG = new StringBuilder();
debugMSG.append(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS
+ "=" + flushIntervalSecs + ", " +
YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS
+ "=" + cleanIntervalSecs + ", " +
YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS
+ "=" + ttl + ", " +
TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
+ "=" + isAppendSupported + ", " +
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR
+ "=" + activePath);
if (summaryEntityTypes != null && !summaryEntityTypes.isEmpty()) {
debugMSG.append(", " + YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES
+ " = " + summaryEntityTypes);
}
LOG.debug(debugMSG.toString());
}
}
@Override
public TimelinePutResponse putEntities(
ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
TimelineEntity... entities) throws IOException, YarnException {
if (appAttemptId == null) {
return putEntities(entities);
}
List<TimelineEntity> entitiesToDBStore = new ArrayList<TimelineEntity>();
List<TimelineEntity> entitiesToSummaryCache
= new ArrayList<TimelineEntity>();
List<TimelineEntity> entitiesToEntityCache
= new ArrayList<TimelineEntity>();
Path attemptDir = createAttemptDir(appAttemptId);
for (TimelineEntity entity : entities) {
if (summaryEntityTypes.contains(entity.getEntityType())) {
entitiesToSummaryCache.add(entity);
} else {
if (groupId != null) {
entitiesToEntityCache.add(entity);
} else {
entitiesToDBStore.add(entity);
}
}
}
if (!entitiesToSummaryCache.isEmpty()) {
Path summaryLogPath =
new Path(attemptDir, SUMMARY_LOG_PREFIX + appAttemptId.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Writing summary log for " + appAttemptId.toString() + " to "
+ summaryLogPath);
}
this.logFDsCache.writeSummaryEntityLogs(fs, summaryLogPath, objMapper,
appAttemptId, entitiesToSummaryCache, isAppendSupported);
}
if (!entitiesToEntityCache.isEmpty()) {
Path entityLogPath =
new Path(attemptDir, ENTITY_LOG_PREFIX + groupId.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Writing entity log for " + groupId.toString() + " to "
+ entityLogPath);
}
this.logFDsCache.writeEntityLogs(fs, entityLogPath, objMapper,
appAttemptId, groupId, entitiesToEntityCache, isAppendSupported);
}
if (!entitiesToDBStore.isEmpty()) {
putEntities(entitiesToDBStore.toArray(
new TimelineEntity[entitiesToDBStore.size()]));
}
return new TimelinePutResponse();
}
@Override
public void putDomain(ApplicationAttemptId appAttemptId,
TimelineDomain domain) throws IOException, YarnException {
if (appAttemptId == null) {
putDomain(domain);
} else {
writeDomain(appAttemptId, domain);
}
}
@Override
public void close() throws Exception {
if (this.logFDsCache != null) {
this.logFDsCache.close();
}
}
private ObjectMapper createObjectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector());
mapper.setSerializationInclusion(Inclusion.NON_NULL);
mapper.configure(Feature.CLOSE_CLOSEABLE, false);
return mapper;
}
private Path createAttemptDir(ApplicationAttemptId appAttemptId)
throws IOException {
Path appDir = createApplicationDir(appAttemptId.getApplicationId());
Path attemptDir = new Path(appDir, appAttemptId.toString());
if (!fs.exists(attemptDir)) {
FileSystem.mkdirs(fs, attemptDir, new FsPermission(
APP_LOG_DIR_PERMISSIONS));
}
return attemptDir;
}
private Path createApplicationDir(ApplicationId appId) throws IOException {
Path appDir =
new Path(activePath, appId.toString());
if (!fs.exists(appDir)) {
FileSystem.mkdirs(fs, appDir, new FsPermission(APP_LOG_DIR_PERMISSIONS));
}
return appDir;
}
private void writeDomain(ApplicationAttemptId appAttemptId,
TimelineDomain domain) throws IOException {
Path domainLogPath =
new Path(createAttemptDir(appAttemptId), DOMAIN_LOG_PREFIX
+ appAttemptId.toString());
LOG.info("Writing domains for " + appAttemptId.toString() + " to "
+ domainLogPath);
this.logFDsCache.writeDomainLog(
fs, domainLogPath, objMapper, domain, isAppendSupported);
}
private static class DomainLogFD extends LogFD {
public DomainLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper,
boolean isAppendSupported) throws IOException {
super(fs, logPath, objMapper, isAppendSupported);
}
public void writeDomain(TimelineDomain domain)
throws IOException {
getObjectMapper().writeValue(getJsonGenerator(), domain);
updateLastModifiedTime(System.currentTimeMillis());
}
}
private static class EntityLogFD extends LogFD {
public EntityLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper,
boolean isAppendSupported) throws IOException {
super(fs, logPath, objMapper, isAppendSupported);
}
public void writeEntities(List<TimelineEntity> entities)
throws IOException {
if (writerClosed()) {
prepareForWrite();
}
for (TimelineEntity entity : entities) {
getObjectMapper().writeValue(getJsonGenerator(), entity);
}
updateLastModifiedTime(System.currentTimeMillis());
}
}
private static class LogFD {
private FSDataOutputStream stream;
private ObjectMapper objMapper;
private JsonGenerator jsonGenerator;
private long lastModifiedTime;
private final boolean isAppendSupported;
private final ReentrantLock fdLock = new ReentrantLock();
private final FileSystem fs;
private final Path logPath;
public LogFD(FileSystem fs, Path logPath, ObjectMapper objMapper,
boolean isAppendSupported) throws IOException {
this.fs = fs;
this.logPath = logPath;
this.isAppendSupported = isAppendSupported;
this.objMapper = objMapper;
prepareForWrite();
}
public void close() {
if (stream != null) {
IOUtils.cleanup(LOG, jsonGenerator);
IOUtils.cleanup(LOG, stream);
stream = null;
jsonGenerator = null;
}
}
public void flush() throws IOException {
if (stream != null) {
stream.hflush();
}
}
public long getLastModifiedTime() {
return this.lastModifiedTime;
}
protected void prepareForWrite() throws IOException{
this.stream = createLogFileStream(fs, logPath);
this.jsonGenerator = new JsonFactory().createJsonGenerator(stream);
this.jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
this.lastModifiedTime = System.currentTimeMillis();
}
protected boolean writerClosed() {
return stream == null;
}
private FSDataOutputStream createLogFileStream(FileSystem fileSystem,
Path logPathToCreate)
throws IOException {
FSDataOutputStream streamToCreate;
if (!isAppendSupported) {
logPathToCreate =
new Path(logPathToCreate.getParent(),
(logPathToCreate.getName() + "_" + System.currentTimeMillis()));
}
if (!fileSystem.exists(logPathToCreate)) {
streamToCreate = fileSystem.create(logPathToCreate, false);
fileSystem.setPermission(logPathToCreate,
new FsPermission(FILE_LOG_PERMISSIONS));
} else {
streamToCreate = fileSystem.append(logPathToCreate);
}
return streamToCreate;
}
public void lock() {
this.fdLock.lock();
}
public void unlock() {
this.fdLock.unlock();
}
protected JsonGenerator getJsonGenerator() {
return jsonGenerator;
}
protected ObjectMapper getObjectMapper() {
return objMapper;
}
protected void updateLastModifiedTime(long updatedTime) {
this.lastModifiedTime = updatedTime;
}
}
private static class LogFDsCache implements Closeable, Flushable{
private DomainLogFD domainLogFD;
private Map<ApplicationAttemptId, EntityLogFD> summanyLogFDs;
private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
EntityLogFD>> entityLogFDs;
private Timer flushTimer;
private FlushTimerTask flushTimerTask;
private Timer cleanInActiveFDsTimer;
private CleanInActiveFDsTask cleanInActiveFDsTask;
private final long ttl;
private final ReentrantLock domainFDLocker = new ReentrantLock();
private final ReentrantLock summaryTableLocker = new ReentrantLock();
private final ReentrantLock entityTableLocker = new ReentrantLock();
private final ReentrantLock summaryTableCopyLocker = new ReentrantLock();
private final ReentrantLock entityTableCopyLocker = new ReentrantLock();
private volatile boolean serviceStopped = false;
public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs,
long ttl) {
domainLogFD = null;
summanyLogFDs = new HashMap<ApplicationAttemptId, EntityLogFD>();
entityLogFDs = new HashMap<ApplicationAttemptId,
HashMap<TimelineEntityGroupId, EntityLogFD>>();
this.flushTimer =
new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer",
true);
this.flushTimerTask = new FlushTimerTask();
this.flushTimer.schedule(flushTimerTask, flushIntervalSecs * 1000,
flushIntervalSecs * 1000);
this.cleanInActiveFDsTimer =
new Timer(LogFDsCache.class.getSimpleName() +
"cleanInActiveFDsTimer", true);
this.cleanInActiveFDsTask = new CleanInActiveFDsTask();
this.cleanInActiveFDsTimer.schedule(cleanInActiveFDsTask,
cleanIntervalSecs * 1000, cleanIntervalSecs * 1000);
this.ttl = ttl * 1000;
}
@Override
public void flush() throws IOException {
try {
this.domainFDLocker.lock();
if (domainLogFD != null) {
domainLogFD.flush();
}
} finally {
this.domainFDLocker.unlock();
}
flushSummaryFDMap(copySummaryLogFDs(summanyLogFDs));
flushEntityFDMap(copyEntityLogFDs(entityLogFDs));
}
private Map<ApplicationAttemptId, EntityLogFD> copySummaryLogFDs(
Map<ApplicationAttemptId, EntityLogFD> summanyLogFDsToCopy) {
try {
summaryTableCopyLocker.lock();
return new HashMap<ApplicationAttemptId, EntityLogFD>(
summanyLogFDsToCopy);
} finally {
summaryTableCopyLocker.unlock();
}
}
private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
EntityLogFD>> copyEntityLogFDs(Map<ApplicationAttemptId,
HashMap<TimelineEntityGroupId, EntityLogFD>> entityLogFDsToCopy) {
try {
entityTableCopyLocker.lock();
return new HashMap<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
EntityLogFD>>(entityLogFDsToCopy);
} finally {
entityTableCopyLocker.unlock();
}
}
private void flushSummaryFDMap(Map<ApplicationAttemptId,
EntityLogFD> logFDs) throws IOException {
if (!logFDs.isEmpty()) {
for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs
.entrySet()) {
EntityLogFD logFD = logFDEntry.getValue();
try {
logFD.lock();
logFD.flush();
} finally {
logFD.unlock();
}
}
}
}
private void flushEntityFDMap(Map<ApplicationAttemptId, HashMap<
TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException {
if (!logFDs.isEmpty()) {
for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
EntityLogFD>> logFDMapEntry : logFDs.entrySet()) {
HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap
= logFDMapEntry.getValue();
for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
: logFDMap.entrySet()) {
EntityLogFD logFD = logFDEntry.getValue();
try {
logFD.lock();
logFD.flush();
} finally {
logFD.unlock();
}
}
}
}
}
private class FlushTimerTask extends TimerTask {
@Override
public void run() {
try {
flush();
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug(e);
}
}
}
}
private void cleanInActiveFDs() {
long currentTimeStamp = System.currentTimeMillis();
try {
this.domainFDLocker.lock();
if (domainLogFD != null) {
if (currentTimeStamp - domainLogFD.getLastModifiedTime() >= ttl) {
domainLogFD.close();
domainLogFD = null;
}
}
} finally {
this.domainFDLocker.unlock();
}
cleanInActiveSummaryFDsforMap(copySummaryLogFDs(summanyLogFDs),
currentTimeStamp);
cleanInActiveEntityFDsforMap(copyEntityLogFDs(entityLogFDs),
currentTimeStamp);
}
private void cleanInActiveSummaryFDsforMap(
Map<ApplicationAttemptId, EntityLogFD> logFDs,
long currentTimeStamp) {
if (!logFDs.isEmpty()) {
for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs
.entrySet()) {
EntityLogFD logFD = logFDEntry.getValue();
try {
logFD.lock();
if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
logFD.close();
}
} finally {
logFD.unlock();
}
}
}
}
private void cleanInActiveEntityFDsforMap(Map<ApplicationAttemptId,
HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs,
long currentTimeStamp) {
if (!logFDs.isEmpty()) {
for (Entry<ApplicationAttemptId, HashMap<
TimelineEntityGroupId, EntityLogFD>> logFDMapEntry
: logFDs.entrySet()) {
HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap
= logFDMapEntry.getValue();
for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
: logFDMap.entrySet()) {
EntityLogFD logFD = logFDEntry.getValue();
try {
logFD.lock();
if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
logFD.close();
}
} finally {
logFD.unlock();
}
}
}
}
}
private class CleanInActiveFDsTask extends TimerTask {
@Override
public void run() {
try {
cleanInActiveFDs();
} catch (Exception e) {
LOG.warn(e);
}
}
}
@Override
public void close() throws IOException {
serviceStopped = true;
flushTimer.cancel();
cleanInActiveFDsTimer.cancel();
try {
this.domainFDLocker.lock();
if (domainLogFD != null) {
domainLogFD.close();
domainLogFD = null;
}
} finally {
this.domainFDLocker.unlock();
}
closeSummaryFDs(summanyLogFDs);
closeEntityFDs(entityLogFDs);
}
private void closeEntityFDs(Map<ApplicationAttemptId,
HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs) {
try {
entityTableLocker.lock();
if (!logFDs.isEmpty()) {
for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
EntityLogFD>> logFDMapEntry : logFDs.entrySet()) {
HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap
= logFDMapEntry.getValue();
for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
: logFDMap.entrySet()) {
EntityLogFD logFD = logFDEntry.getValue();
try {
logFD.lock();
logFD.close();
} finally {
logFD.unlock();
}
}
}
}
} finally {
entityTableLocker.unlock();
}
}
private void closeSummaryFDs(
Map<ApplicationAttemptId, EntityLogFD> logFDs) {
try {
summaryTableLocker.lock();
if (!logFDs.isEmpty()) {
for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry
: logFDs.entrySet()) {
EntityLogFD logFD = logFDEntry.getValue();
try {
logFD.lock();
logFD.close();
} finally {
logFD.unlock();
}
}
}
} finally {
summaryTableLocker.unlock();
}
}
public void writeDomainLog(FileSystem fs, Path logPath,
ObjectMapper objMapper, TimelineDomain domain,
boolean isAppendSupported) throws IOException {
try {
this.domainFDLocker.lock();
if (this.domainLogFD != null) {
this.domainLogFD.writeDomain(domain);
} else {
this.domainLogFD =
new DomainLogFD(fs, logPath, objMapper, isAppendSupported);
this.domainLogFD.writeDomain(domain);
}
} finally {
this.domainFDLocker.unlock();
}
}
public void writeEntityLogs(FileSystem fs, Path entityLogPath,
ObjectMapper objMapper, ApplicationAttemptId appAttemptId,
TimelineEntityGroupId groupId, List<TimelineEntity> entitiesToEntity,
boolean isAppendSupported) throws IOException{
writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId,
groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs);
}
private void writeEntityLogs(FileSystem fs, Path logPath,
ObjectMapper objMapper, ApplicationAttemptId attemptId,
TimelineEntityGroupId groupId, List<TimelineEntity> entities,
boolean isAppendSupported, Map<ApplicationAttemptId, HashMap<
TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException {
HashMap<TimelineEntityGroupId, EntityLogFD>logMapFD
= logFDs.get(attemptId);
if (logMapFD != null) {
EntityLogFD logFD = logMapFD.get(groupId);
if (logFD != null) {
try {
logFD.lock();
if (serviceStopped) {
return;
}
logFD.writeEntities(entities);
} finally {
logFD.unlock();
}
} else {
createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId,
entities, isAppendSupported, logFDs);
}
} else {
createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId,
entities, isAppendSupported, logFDs);
}
}
private void createEntityFDandWrite(FileSystem fs, Path logPath,
ObjectMapper objMapper, ApplicationAttemptId attemptId,
TimelineEntityGroupId groupId, List<TimelineEntity> entities,
boolean isAppendSupported, Map<ApplicationAttemptId, HashMap<
TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException{
try {
entityTableLocker.lock();
if (serviceStopped) {
return;
}
HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap =
logFDs.get(attemptId);
if (logFDMap == null) {
logFDMap = new HashMap<TimelineEntityGroupId, EntityLogFD>();
}
EntityLogFD logFD = logFDMap.get(groupId);
if (logFD == null) {
logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
}
try {
logFD.lock();
logFD.writeEntities(entities);
try {
entityTableCopyLocker.lock();
logFDMap.put(groupId, logFD);
logFDs.put(attemptId, logFDMap);
} finally {
entityTableCopyLocker.unlock();
}
} finally {
logFD.unlock();
}
} finally {
entityTableLocker.unlock();
}
}
public void writeSummaryEntityLogs(FileSystem fs, Path logPath,
ObjectMapper objMapper, ApplicationAttemptId attemptId,
List<TimelineEntity> entities, boolean isAppendSupported)
throws IOException {
writeSummmaryEntityLogs(fs, logPath, objMapper, attemptId, entities,
isAppendSupported, this.summanyLogFDs);
}
private void writeSummmaryEntityLogs(FileSystem fs, Path logPath,
ObjectMapper objMapper, ApplicationAttemptId attemptId,
List<TimelineEntity> entities, boolean isAppendSupported,
Map<ApplicationAttemptId, EntityLogFD> logFDs) throws IOException {
EntityLogFD logFD = null;
logFD = logFDs.get(attemptId);
if (logFD != null) {
try {
logFD.lock();
if (serviceStopped) {
return;
}
logFD.writeEntities(entities);
} finally {
logFD.unlock();
}
} else {
createSummaryFDAndWrite(fs, logPath, objMapper, attemptId, entities,
isAppendSupported, logFDs);
}
}
private void createSummaryFDAndWrite(FileSystem fs, Path logPath,
ObjectMapper objMapper, ApplicationAttemptId attemptId,
List<TimelineEntity> entities, boolean isAppendSupported,
Map<ApplicationAttemptId, EntityLogFD> logFDs) throws IOException {
try {
summaryTableLocker.lock();
if (serviceStopped) {
return;
}
EntityLogFD logFD = logFDs.get(attemptId);
if (logFD == null) {
logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
}
try {
logFD.lock();
logFD.writeEntities(entities);
try {
summaryTableCopyLocker.lock();
logFDs.put(attemptId, logFD);
} finally {
summaryTableCopyLocker.unlock();
}
} finally {
logFD.unlock();
}
} finally {
summaryTableLocker.unlock();
}
}
}
}

View File

@ -33,8 +33,6 @@
import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.SSLSocketFactory;
import javax.ws.rs.core.MediaType;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.HelpFormatter;
@ -54,19 +52,19 @@
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -74,7 +72,6 @@
import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientRequest; import com.sun.jersey.api.client.ClientRequest;
import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.ClientConfig; import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter; import com.sun.jersey.api.client.filter.ClientFilter;
@ -110,6 +107,9 @@ public class TimelineClientImpl extends TimelineClient {
private URI resURI; private URI resURI;
private UserGroupInformation authUgi; private UserGroupInformation authUgi;
private String doAsUser; private String doAsUser;
private Configuration configuration;
private float timelineServiceVersion;
private TimelineWriter timelineWriter;
@Private @Private
@VisibleForTesting @VisibleForTesting
@ -254,6 +254,7 @@ public TimelineClientImpl() {
} }
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
this.configuration = conf;
UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation realUgi = ugi.getRealUser(); UserGroupInformation realUgi = ugi.getRealUser();
if (realUgi != null) { if (realUgi != null) {
@ -293,58 +294,48 @@ protected void serviceInit(Configuration conf) throws Exception {
RESOURCE_URI_STR)); RESOURCE_URI_STR));
} }
LOG.info("Timeline service address: " + resURI); LOG.info("Timeline service address: " + resURI);
timelineServiceVersion =
conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
super.serviceInit(conf); super.serviceInit(conf);
} }
@Override
protected void serviceStart() throws Exception {
timelineWriter = createTimelineWriter(
configuration, authUgi, client, resURI);
}
protected TimelineWriter createTimelineWriter(Configuration conf,
UserGroupInformation ugi, Client webClient, URI uri)
throws IOException {
if (Float.compare(this.timelineServiceVersion, 1.5f) == 0) {
return new FileSystemTimelineWriter(
conf, ugi, webClient, uri);
} else {
return new DirectTimelineWriter(ugi, webClient, uri);
}
}
@Override
protected void serviceStop() throws Exception {
if (this.timelineWriter != null) {
this.timelineWriter.close();
}
super.serviceStop();
}
@Override @Override
public TimelinePutResponse putEntities( public TimelinePutResponse putEntities(
TimelineEntity... entities) throws IOException, YarnException { TimelineEntity... entities) throws IOException, YarnException {
TimelineEntities entitiesContainer = new TimelineEntities(); return timelineWriter.putEntities(entities);
for (TimelineEntity entity : entities) {
if (entity.getEntityId() == null || entity.getEntityType() == null) {
throw new YarnException("Incomplete entity without entity id/type");
}
entitiesContainer.addEntity(entity);
}
ClientResponse resp = doPosting(entitiesContainer, null);
return resp.getEntity(TimelinePutResponse.class);
} }
@Override @Override
public void putDomain(TimelineDomain domain) throws IOException, public void putDomain(TimelineDomain domain) throws IOException,
YarnException { YarnException {
doPosting(domain, "domain"); timelineWriter.putDomain(domain);
}
private ClientResponse doPosting(final Object obj, final String path)
throws IOException, YarnException {
ClientResponse resp;
try {
resp = authUgi.doAs(new PrivilegedExceptionAction<ClientResponse>() {
@Override
public ClientResponse run() throws Exception {
return doPostingObject(obj, path);
}
});
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
} catch (InterruptedException ie) {
throw new IOException(ie);
}
if (resp == null ||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
String msg =
"Failed to get the response from the timeline server.";
LOG.error(msg);
if (LOG.isDebugEnabled() && resp != null) {
String output = resp.getEntity(String.class);
LOG.debug("HTTP error code: " + resp.getStatus()
+ " Server response : \n" + output);
}
throw new YarnException(msg);
}
return resp;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -470,23 +461,6 @@ public boolean shouldRetryOn(Exception e) {
return connectionRetry.retryOn(tokenRetryOp); return connectionRetry.retryOn(tokenRetryOp);
} }
@Private
@VisibleForTesting
public ClientResponse doPostingObject(Object object, String path) {
WebResource webResource = client.resource(resURI);
if (path == null) {
return webResource.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, object);
} else if (path.equals("domain")) {
return webResource.path(path).accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.put(ClientResponse.class, object);
} else {
throw new YarnRuntimeException("Unknown resource type");
}
}
private class TimelineURLConnectionFactory private class TimelineURLConnectionFactory
implements HttpURLConnectionFactory { implements HttpURLConnectionFactory {
@ -663,4 +637,34 @@ private static void printUsage() {
public UserGroupInformation getUgi() { public UserGroupInformation getUgi() {
return authUgi; return authUgi;
} }
@Override
public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
TimelineEntityGroupId groupId, TimelineEntity... entities)
throws IOException, YarnException {
if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
throw new YarnException(
"This API is not supported under current Timeline Service Version: "
+ timelineServiceVersion);
}
return timelineWriter.putEntities(appAttemptId, groupId, entities);
}
@Override
public void putDomain(ApplicationAttemptId appAttemptId,
TimelineDomain domain) throws IOException, YarnException {
if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
throw new YarnException(
"This API is not supported under current Timeline Service Version: "
+ timelineServiceVersion);
}
timelineWriter.putDomain(appAttemptId, domain);
}
@Private
@VisibleForTesting
public void setTimelineWriter(TimelineWriter writer) {
this.timelineWriter = writer;
}
} }

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import javax.ws.rs.core.MediaType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
/**
* Base writer class to write the Timeline data.
*/
@Private
@Unstable
public abstract class TimelineWriter {
private static final Log LOG = LogFactory
.getLog(TimelineWriter.class);
private UserGroupInformation authUgi;
private Client client;
private URI resURI;
public TimelineWriter(UserGroupInformation authUgi, Client client,
URI resURI) {
this.authUgi = authUgi;
this.client = client;
this.resURI = resURI;
}
public void close() throws Exception {
// DO NOTHING
}
public TimelinePutResponse putEntities(
TimelineEntity... entities) throws IOException, YarnException {
TimelineEntities entitiesContainer = new TimelineEntities();
for (TimelineEntity entity : entities) {
if (entity.getEntityId() == null || entity.getEntityType() == null) {
throw new YarnException("Incomplete entity without entity id/type");
}
entitiesContainer.addEntity(entity);
}
ClientResponse resp = doPosting(entitiesContainer, null);
return resp.getEntity(TimelinePutResponse.class);
}
public void putDomain(TimelineDomain domain) throws IOException,
YarnException {
doPosting(domain, "domain");
}
public abstract TimelinePutResponse putEntities(
ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
TimelineEntity... entities) throws IOException, YarnException;
public abstract void putDomain(ApplicationAttemptId appAttemptId,
TimelineDomain domain) throws IOException, YarnException;
private ClientResponse doPosting(final Object obj, final String path)
throws IOException, YarnException {
ClientResponse resp;
try {
resp = authUgi.doAs(new PrivilegedExceptionAction<ClientResponse>() {
@Override
public ClientResponse run() throws Exception {
return doPostingObject(obj, path);
}
});
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
} catch (InterruptedException ie) {
throw new IOException(ie);
}
if (resp == null ||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
String msg =
"Failed to get the response from the timeline server.";
LOG.error(msg);
if (LOG.isDebugEnabled() && resp != null) {
String output = resp.getEntity(String.class);
LOG.debug("HTTP error code: " + resp.getStatus()
+ " Server response : \n" + output);
}
throw new YarnException(msg);
}
return resp;
}
@Private
@VisibleForTesting
public ClientResponse doPostingObject(Object object, String path) {
WebResource webResource = client.resource(resURI);
if (path == null) {
return webResource.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, object);
} else if (path.equals("domain")) {
return webResource.path(path).accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.put(ClientResponse.class, object);
} else {
throw new YarnRuntimeException("Unknown resource type");
}
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.junit.Assert;
import org.junit.Test;
public class TestTimelineEntityGroupId {
@Test
public void testTimelineEntityGroupId() {
ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
TimelineEntityGroupId group1 = TimelineEntityGroupId.newInstance(appId1, "1");
TimelineEntityGroupId group2 = TimelineEntityGroupId.newInstance(appId1, "2");
TimelineEntityGroupId group3 = TimelineEntityGroupId.newInstance(appId2, "1");
TimelineEntityGroupId group4 = TimelineEntityGroupId.newInstance(appId1, "1");
Assert.assertTrue(group1.equals(group4));
Assert.assertFalse(group1.equals(group2));
Assert.assertFalse(group1.equals(group3));
Assert.assertTrue(group1.compareTo(group4) == 0);
Assert.assertTrue(group1.compareTo(group2) < 0);
Assert.assertTrue(group1.compareTo(group3) < 0);
Assert.assertTrue(group1.hashCode() == group4.hashCode());
Assert.assertFalse(group1.hashCode() == group2.hashCode());
Assert.assertFalse(group1.hashCode() == group3.hashCode());
Assert.assertEquals("timelineEntityGroupId_1234_1_1", group1.toString());
Assert.assertEquals(TimelineEntityGroupId.fromString("timelineEntityGroupId_1234_1_1"), group1);
}
}

View File

@ -25,8 +25,11 @@
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -37,7 +40,6 @@
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
@ -46,17 +48,20 @@
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse;
public class TestTimelineClient { public class TestTimelineClient {
private TimelineClientImpl client; private TimelineClientImpl client;
private TimelineWriter spyTimelineWriter;
@Before @Before
public void setup() { public void setup() {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
client = createTimelineClient(conf); client = createTimelineClient(conf);
} }
@ -69,7 +74,8 @@ public void tearDown() {
@Test @Test
public void testPostEntities() throws Exception { public void testPostEntities() throws Exception {
mockEntityClientResponse(client, ClientResponse.Status.OK, false, false); mockEntityClientResponse(spyTimelineWriter, ClientResponse.Status.OK,
false, false);
try { try {
TimelinePutResponse response = client.putEntities(generateEntity()); TimelinePutResponse response = client.putEntities(generateEntity());
Assert.assertEquals(0, response.getErrors().size()); Assert.assertEquals(0, response.getErrors().size());
@ -80,7 +86,8 @@ public void testPostEntities() throws Exception {
@Test @Test
public void testPostEntitiesWithError() throws Exception { public void testPostEntitiesWithError() throws Exception {
mockEntityClientResponse(client, ClientResponse.Status.OK, true, false); mockEntityClientResponse(spyTimelineWriter, ClientResponse.Status.OK, true,
false);
try { try {
TimelinePutResponse response = client.putEntities(generateEntity()); TimelinePutResponse response = client.putEntities(generateEntity());
Assert.assertEquals(1, response.getErrors().size()); Assert.assertEquals(1, response.getErrors().size());
@ -106,8 +113,8 @@ public void testPostIncompleteEntities() throws Exception {
@Test @Test
public void testPostEntitiesNoResponse() throws Exception { public void testPostEntitiesNoResponse() throws Exception {
mockEntityClientResponse( mockEntityClientResponse(spyTimelineWriter,
client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false); ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
try { try {
client.putEntities(generateEntity()); client.putEntities(generateEntity());
Assert.fail("Exception is expected"); Assert.fail("Exception is expected");
@ -119,7 +126,7 @@ public void testPostEntitiesNoResponse() throws Exception {
@Test @Test
public void testPostEntitiesConnectionRefused() throws Exception { public void testPostEntitiesConnectionRefused() throws Exception {
mockEntityClientResponse(client, null, false, true); mockEntityClientResponse(spyTimelineWriter, null, false, true);
try { try {
client.putEntities(generateEntity()); client.putEntities(generateEntity());
Assert.fail("RuntimeException is expected"); Assert.fail("RuntimeException is expected");
@ -130,7 +137,7 @@ public void testPostEntitiesConnectionRefused() throws Exception {
@Test @Test
public void testPutDomain() throws Exception { public void testPutDomain() throws Exception {
mockDomainClientResponse(client, ClientResponse.Status.OK, false); mockDomainClientResponse(spyTimelineWriter, ClientResponse.Status.OK, false);
try { try {
client.putDomain(generateDomain()); client.putDomain(generateDomain());
} catch (YarnException e) { } catch (YarnException e) {
@ -140,7 +147,8 @@ public void testPutDomain() throws Exception {
@Test @Test
public void testPutDomainNoResponse() throws Exception { public void testPutDomainNoResponse() throws Exception {
mockDomainClientResponse(client, ClientResponse.Status.FORBIDDEN, false); mockDomainClientResponse(spyTimelineWriter,
ClientResponse.Status.FORBIDDEN, false);
try { try {
client.putDomain(generateDomain()); client.putDomain(generateDomain());
Assert.fail("Exception is expected"); Assert.fail("Exception is expected");
@ -152,7 +160,7 @@ public void testPutDomainNoResponse() throws Exception {
@Test @Test
public void testPutDomainConnectionRefused() throws Exception { public void testPutDomainConnectionRefused() throws Exception {
mockDomainClientResponse(client, null, true); mockDomainClientResponse(spyTimelineWriter, null, true);
try { try {
client.putDomain(generateDomain()); client.putDomain(generateDomain());
Assert.fail("RuntimeException is expected"); Assert.fail("RuntimeException is expected");
@ -291,15 +299,16 @@ private void assertException(TimelineClientImpl client, RuntimeException ce) {
} }
private static ClientResponse mockEntityClientResponse( private static ClientResponse mockEntityClientResponse(
TimelineClientImpl client, ClientResponse.Status status, TimelineWriter spyTimelineWriter, ClientResponse.Status status,
boolean hasError, boolean hasRuntimeError) { boolean hasError, boolean hasRuntimeError) {
ClientResponse response = mock(ClientResponse.class); ClientResponse response = mock(ClientResponse.class);
if (hasRuntimeError) { if (hasRuntimeError) {
doThrow(new ClientHandlerException(new ConnectException())).when(client) doThrow(new ClientHandlerException(new ConnectException())).when(
.doPostingObject(any(TimelineEntities.class), any(String.class)); spyTimelineWriter).doPostingObject(
any(TimelineEntities.class), any(String.class));
return response; return response;
} }
doReturn(response).when(client) doReturn(response).when(spyTimelineWriter)
.doPostingObject(any(TimelineEntities.class), any(String.class)); .doPostingObject(any(TimelineEntities.class), any(String.class));
when(response.getClientResponseStatus()).thenReturn(status); when(response.getClientResponseStatus()).thenReturn(status);
TimelinePutResponse.TimelinePutError error = TimelinePutResponse.TimelinePutError error =
@ -316,15 +325,16 @@ private static ClientResponse mockEntityClientResponse(
} }
private static ClientResponse mockDomainClientResponse( private static ClientResponse mockDomainClientResponse(
TimelineClientImpl client, ClientResponse.Status status, TimelineWriter spyTimelineWriter, ClientResponse.Status status,
boolean hasRuntimeError) { boolean hasRuntimeError) {
ClientResponse response = mock(ClientResponse.class); ClientResponse response = mock(ClientResponse.class);
if (hasRuntimeError) { if (hasRuntimeError) {
doThrow(new ClientHandlerException(new ConnectException())).when(client) doThrow(new ClientHandlerException(new ConnectException())).when(
.doPostingObject(any(TimelineDomain.class), any(String.class)); spyTimelineWriter).doPostingObject(any(TimelineDomain.class),
any(String.class));
return response; return response;
} }
doReturn(response).when(client) doReturn(response).when(spyTimelineWriter)
.doPostingObject(any(TimelineDomain.class), any(String.class)); .doPostingObject(any(TimelineDomain.class), any(String.class));
when(response.getClientResponseStatus()).thenReturn(status); when(response.getClientResponseStatus()).thenReturn(status);
return response; return response;
@ -365,10 +375,19 @@ public static TimelineDomain generateDomain() {
return domain; return domain;
} }
private static TimelineClientImpl createTimelineClient( private TimelineClientImpl createTimelineClient(
YarnConfiguration conf) { YarnConfiguration conf) {
TimelineClientImpl client = TimelineClientImpl client = new TimelineClientImpl() {
spy((TimelineClientImpl) TimelineClient.createTimelineClient()); @Override
protected TimelineWriter createTimelineWriter(Configuration conf,
UserGroupInformation authUgi, Client client, URI resURI)
throws IOException {
TimelineWriter timelineWriter =
new DirectTimelineWriter(authUgi, client, resURI);
spyTimelineWriter = spy(timelineWriter);
return spyTimelineWriter;
}
};
client.init(conf); client.init(conf);
client.start(); client.start();
return client; return client;

View File

@ -0,0 +1,225 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.reset;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
public class TestTimelineClientForATS1_5 {
protected static Log LOG = LogFactory
.getLog(TestTimelineClientForATS1_5.class);
private TimelineClientImpl client;
private static FileContext localFS;
private static File localActiveDir;
private TimelineWriter spyTimelineWriter;
@Before
public void setup() throws Exception {
localFS = FileContext.getLocalFSFileContext();
localActiveDir =
new File("target", this.getClass().getSimpleName() + "-activeDir")
.getAbsoluteFile();
localFS.delete(new Path(localActiveDir.getAbsolutePath()), true);
localActiveDir.mkdir();
LOG.info("Created activeDir in " + localActiveDir.getAbsolutePath());
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f);
conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
localActiveDir.getAbsolutePath());
conf.set(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
"summary_type");
client = createTimelineClient(conf);
}
@After
public void tearDown() throws Exception {
if (client != null) {
client.stop();
}
localFS.delete(new Path(localActiveDir.getAbsolutePath()), true);
}
@Test
public void testPostEntities() throws Exception {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
TimelineEntityGroupId groupId =
TimelineEntityGroupId.newInstance(appId, "1");
TimelineEntityGroupId groupId2 =
TimelineEntityGroupId.newInstance(appId, "2");
// Create two entities, includes an entity type and a summary type
TimelineEntity[] entities = new TimelineEntity[2];
entities[0] = generateEntity("entity_type");
entities[1] = generateEntity("summary_type");
try {
// if attemptid is null, fall back to the original putEntities call, and
// save the entity
// into configured levelDB store
client.putEntities(null, null, entities);
verify(spyTimelineWriter, times(1)).putEntities(entities);
reset(spyTimelineWriter);
// if the attemptId is specified, but groupId is given as null, it would
// fall back to the original putEntities call if we have the entity type.
// the entity which is summary type would be written into FS
ApplicationAttemptId attemptId1 =
ApplicationAttemptId.newInstance(appId, 1);
client.putEntities(attemptId1, null, entities);
TimelineEntity[] entityTDB = new TimelineEntity[1];
entityTDB[0] = entities[0];
verify(spyTimelineWriter, times(1)).putEntities(entityTDB);
Assert.assertTrue(localFS.util().exists(
new Path(getAppAttemptDir(attemptId1), "summarylog-"
+ attemptId1.toString())));
reset(spyTimelineWriter);
// if we specified attemptId as well as groupId, it would save the entity
// into
// FileSystem instead of levelDB store
ApplicationAttemptId attemptId2 =
ApplicationAttemptId.newInstance(appId, 2);
client.putEntities(attemptId2, groupId, entities);
client.putEntities(attemptId2, groupId2, entities);
verify(spyTimelineWriter, times(0)).putEntities(
any(TimelineEntity[].class));
Assert.assertTrue(localFS.util().exists(
new Path(getAppAttemptDir(attemptId2), "summarylog-"
+ attemptId2.toString())));
Assert.assertTrue(localFS.util().exists(
new Path(getAppAttemptDir(attemptId2), "entitylog-"
+ groupId.toString())));
Assert.assertTrue(localFS.util().exists(
new Path(getAppAttemptDir(attemptId2), "entitylog-"
+ groupId2.toString())));
reset(spyTimelineWriter);
} catch (Exception e) {
Assert.fail("Exception is not expected. " + e);
}
}
@Test
public void testPutDomain() {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationAttemptId attemptId1 =
ApplicationAttemptId.newInstance(appId, 1);
try {
TimelineDomain domain = generateDomain();
client.putDomain(null, domain);
verify(spyTimelineWriter, times(1)).putDomain(domain);
reset(spyTimelineWriter);
client.putDomain(attemptId1, domain);
verify(spyTimelineWriter, times(0)).putDomain(domain);
Assert.assertTrue(localFS.util().exists(
new Path(getAppAttemptDir(attemptId1), "domainlog-"
+ attemptId1.toString())));
reset(spyTimelineWriter);
} catch (Exception e) {
Assert.fail("Exception is not expected." + e);
}
}
private Path getAppAttemptDir(ApplicationAttemptId appAttemptId) {
Path appDir =
new Path(localActiveDir.getAbsolutePath(), appAttemptId
.getApplicationId().toString());
Path attemptDir = new Path(appDir, appAttemptId.toString());
return attemptDir;
}
private static TimelineEntity generateEntity(String type) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityId("entity id");
entity.setEntityType(type);
entity.setStartTime(System.currentTimeMillis());
return entity;
}
private static TimelineDomain generateDomain() {
TimelineDomain domain = new TimelineDomain();
domain.setId("namesapce id");
domain.setDescription("domain description");
domain.setOwner("domain owner");
domain.setReaders("domain_reader");
domain.setWriters("domain_writer");
domain.setCreatedTime(0L);
domain.setModifiedTime(1L);
return domain;
}
private TimelineClientImpl createTimelineClient(YarnConfiguration conf) {
TimelineClientImpl client = new TimelineClientImpl() {
@Override
protected TimelineWriter createTimelineWriter(Configuration conf,
UserGroupInformation authUgi, Client client, URI resURI)
throws IOException {
TimelineWriter timelineWriter =
new FileSystemTimelineWriter(conf, authUgi, client, resURI) {
public ClientResponse doPostingObject(Object object, String path) {
ClientResponse response = mock(ClientResponse.class);
when(response.getClientResponseStatus()).thenReturn(
ClientResponse.Status.OK);
return response;
}
};
spyTimelineWriter = spy(timelineWriter);
return spyTimelineWriter;
}
};
client.init(conf);
client.start();
return client;
}
}

View File

@ -19,15 +19,20 @@
package org.apache.hadoop.yarn.server.timeline.webapp; package org.apache.hadoop.yarn.server.timeline.webapp;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.EnumSet; import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp; import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
@ -39,6 +44,7 @@
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse;
public class TestTimelineWebServicesWithSSL { public class TestTimelineWebServicesWithSSL {
@ -60,6 +66,7 @@ public static void setupServer() throws Exception {
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
MemoryTimelineStore.class, TimelineStore.class); MemoryTimelineStore.class, TimelineStore.class);
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, "HTTPS_ONLY"); conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, "HTTPS_ONLY");
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
File base = new File(BASEDIR); File base = new File(BASEDIR);
FileUtil.fullyDelete(base); FileUtil.fullyDelete(base);
@ -122,12 +129,18 @@ private static class TestTimelineClient extends TimelineClientImpl {
private ClientResponse resp; private ClientResponse resp;
@Override
protected TimelineWriter createTimelineWriter(Configuration conf,
UserGroupInformation authUgi, Client client, URI resURI)
throws IOException {
return new DirectTimelineWriter(authUgi, client, resURI) {
@Override @Override
public ClientResponse doPostingObject(Object obj, String path) { public ClientResponse doPostingObject(Object obj, String path) {
resp = super.doPostingObject(obj, path); resp = super.doPostingObject(obj, path);
return resp; return resp;
} }
};
}
} }
} }

222
q Normal file
View File

@ -0,0 +1,222 @@
SSUUMMMMAARRYY OOFF LLEESSSS CCOOMMMMAANNDDSS
Commands marked with * may be preceded by a number, _N.
Notes in parentheses indicate the behavior if _N is given.
h H Display this help.
q :q Q :Q ZZ Exit.
---------------------------------------------------------------------------
MMOOVVIINNGG
e ^E j ^N CR * Forward one line (or _N lines).
y ^Y k ^K ^P * Backward one line (or _N lines).
f ^F ^V SPACE * Forward one window (or _N lines).
b ^B ESC-v * Backward one window (or _N lines).
z * Forward one window (and set window to _N).
w * Backward one window (and set window to _N).
ESC-SPACE * Forward one window, but don't stop at end-of-file.
d ^D * Forward one half-window (and set half-window to _N).
u ^U * Backward one half-window (and set half-window to _N).
ESC-) RightArrow * Left one half screen width (or _N positions).
ESC-( LeftArrow * Right one half screen width (or _N positions).
F Forward forever; like "tail -f".
r ^R ^L Repaint screen.
R Repaint screen, discarding buffered input.
---------------------------------------------------
Default "window" is the screen height.
Default "half-window" is half of the screen height.
---------------------------------------------------------------------------
SSEEAARRCCHHIINNGG
/_p_a_t_t_e_r_n * Search forward for (_N-th) matching line.
?_p_a_t_t_e_r_n * Search backward for (_N-th) matching line.
n * Repeat previous search (for _N-th occurrence).
N * Repeat previous search in reverse direction.
ESC-n * Repeat previous search, spanning files.
ESC-N * Repeat previous search, reverse dir. & spanning files.
ESC-u Undo (toggle) search highlighting.
&_p_a_t_t_e_r_n * Display only matching lines
---------------------------------------------------
Search patterns may be modified by one or more of:
^N or ! Search for NON-matching lines.
^E or * Search multiple files (pass thru END OF FILE).
^F or @ Start search at FIRST file (for /) or last file (for ?).
^K Highlight matches, but don't move (KEEP position).
^R Don't use REGULAR EXPRESSIONS.
---------------------------------------------------------------------------
JJUUMMPPIINNGG
g < ESC-< * Go to first line in file (or line _N).
G > ESC-> * Go to last line in file (or line _N).
p % * Go to beginning of file (or _N percent into file).
t * Go to the (_N-th) next tag.
T * Go to the (_N-th) previous tag.
{ ( [ * Find close bracket } ) ].
} ) ] * Find open bracket { ( [.
ESC-^F _<_c_1_> _<_c_2_> * Find close bracket _<_c_2_>.
ESC-^B _<_c_1_> _<_c_2_> * Find open bracket _<_c_1_>
---------------------------------------------------
Each "find close bracket" command goes forward to the close bracket
matching the (_N-th) open bracket in the top line.
Each "find open bracket" command goes backward to the open bracket
matching the (_N-th) close bracket in the bottom line.
m_<_l_e_t_t_e_r_> Mark the current position with <letter>.
'_<_l_e_t_t_e_r_> Go to a previously marked position.
'' Go to the previous position.
^X^X Same as '.
---------------------------------------------------
A mark is any upper-case or lower-case letter.
Certain marks are predefined:
^ means beginning of the file
$ means end of the file
---------------------------------------------------------------------------
CCHHAANNGGIINNGG FFIILLEESS
:e [_f_i_l_e] Examine a new file.
^X^V Same as :e.
:n * Examine the (_N-th) next file from the command line.
:p * Examine the (_N-th) previous file from the command line.
:x * Examine the first (or _N-th) file from the command line.
:d Delete the current file from the command line list.
= ^G :f Print current file name.
---------------------------------------------------------------------------
MMIISSCCEELLLLAANNEEOOUUSS CCOOMMMMAANNDDSS
-_<_f_l_a_g_> Toggle a command line option [see OPTIONS below].
--_<_n_a_m_e_> Toggle a command line option, by name.
__<_f_l_a_g_> Display the setting of a command line option.
___<_n_a_m_e_> Display the setting of an option, by name.
+_c_m_d Execute the less cmd each time a new file is examined.
!_c_o_m_m_a_n_d Execute the shell command with $SHELL.
|XX_c_o_m_m_a_n_d Pipe file between current pos & mark XX to shell command.
v Edit the current file with $VISUAL or $EDITOR.
V Print version number of "less".
---------------------------------------------------------------------------
OOPPTTIIOONNSS
Most options may be changed either on the command line,
or from within less by using the - or -- command.
Options may be given in one of two forms: either a single
character preceded by a -, or a name preceeded by --.
-? ........ --help
Display help (from command line).
-a ........ --search-skip-screen
Forward search skips current screen.
-b [_N] .... --buffers=[_N]
Number of buffers.
-B ........ --auto-buffers
Don't automatically allocate buffers for pipes.
-c ........ --clear-screen
Repaint by clearing rather than scrolling.
-d ........ --dumb
Dumb terminal.
-D [_x_n_._n] . --color=_x_n_._n
Set screen colors. (MS-DOS only)
-e -E .... --quit-at-eof --QUIT-AT-EOF
Quit at end of file.
-f ........ --force
Force open non-regular files.
-F ........ --quit-if-one-screen
Quit if entire file fits on first screen.
-g ........ --hilite-search
Highlight only last match for searches.
-G ........ --HILITE-SEARCH
Don't highlight any matches for searches.
-h [_N] .... --max-back-scroll=[_N]
Backward scroll limit.
-i ........ --ignore-case
Ignore case in searches that do not contain uppercase.
-I ........ --IGNORE-CASE
Ignore case in all searches.
-j [_N] .... --jump-target=[_N]
Screen position of target lines.
-J ........ --status-column
Display a status column at left edge of screen.
-k [_f_i_l_e] . --lesskey-file=[_f_i_l_e]
Use a lesskey file.
-L ........ --no-lessopen
Ignore the LESSOPEN environment variable.
-m -M .... --long-prompt --LONG-PROMPT
Set prompt style.
-n -N .... --line-numbers --LINE-NUMBERS
Don't use line numbers.
-o [_f_i_l_e] . --log-file=[_f_i_l_e]
Copy to log file (standard input only).
-O [_f_i_l_e] . --LOG-FILE=[_f_i_l_e]
Copy to log file (unconditionally overwrite).
-p [_p_a_t_t_e_r_n] --pattern=[_p_a_t_t_e_r_n]
Start at pattern (from command line).
-P [_p_r_o_m_p_t] --prompt=[_p_r_o_m_p_t]
Define new prompt.
-q -Q .... --quiet --QUIET --silent --SILENT
Quiet the terminal bell.
-r -R .... --raw-control-chars --RAW-CONTROL-CHARS
Output "raw" control characters.
-s ........ --squeeze-blank-lines
Squeeze multiple blank lines.
-S ........ --chop-long-lines
Chop long lines.
-t [_t_a_g] .. --tag=[_t_a_g]
Find a tag.
-T [_t_a_g_s_f_i_l_e] --tag-file=[_t_a_g_s_f_i_l_e]
Use an alternate tags file.
-u -U .... --underline-special --UNDERLINE-SPECIAL
Change handling of backspaces.
-V ........ --version
Display the version number of "less".
-w ........ --hilite-unread
Highlight first new line after forward-screen.
-W ........ --HILITE-UNREAD
Highlight first new line after any forward movement.
-x [_N[,...]] --tabs=[_N[,...]]
Set tab stops.
-X ........ --no-init
Don't use termcap init/deinit strings.
--no-keypad
Don't use termcap keypad init/deinit strings.
-y [_N] .... --max-forw-scroll=[_N]
Forward scroll limit.
-z [_N] .... --window=[_N]
Set size of window.
-" [_c[_c]] . --quotes=[_c[_c]]
Set shell quote characters.
-~ ........ --tilde
Don't display tildes after end of file.
-# [_N] .... --shift=[_N]
Horizontal scroll amount (0 = one half screen width)
---------------------------------------------------------------------------
LLIINNEE EEDDIITTIINNGG
These keys can be used to edit text being entered
on the "command line" at the bottom of the screen.
RightArrow ESC-l Move cursor right one character.
LeftArrow ESC-h Move cursor left one character.
CNTL-RightArrow ESC-RightArrow ESC-w Move cursor right one word.
CNTL-LeftArrow ESC-LeftArrow ESC-b Move cursor left one word.
HOME ESC-0 Move cursor to start of line.
END ESC-$ Move cursor to end of line.
BACKSPACE Delete char to left of cursor.
DELETE ESC-x Delete char under cursor.
CNTL-BACKSPACE ESC-BACKSPACE Delete word to left of cursor.
CNTL-DELETE ESC-DELETE ESC-X Delete word under cursor.
CNTL-U ESC (MS-DOS only) Delete entire line.
UpArrow ESC-k Retrieve previous command line.
DownArrow ESC-j Retrieve next command line.
TAB Complete filename & cycle.
SHIFT-TAB ESC-TAB Complete filename & reverse cycle.
CNTL-L Complete filename, list all.