YARN-4234. New put APIs in TimelineClient for ats v1.5. Contributed by Xuan Gong.
This commit is contained in:
parent
8c180a13c8
commit
882f2f0464
|
@ -288,6 +288,9 @@ Release 2.8.0 - UNRELEASED
|
|||
|
||||
YARN-3458. CPU resource monitoring in Windows. (Inigo Goiri via cnauroth)
|
||||
|
||||
YARN-4234. New put APIs in TimelineClient for ats v1.5. (Xuan Gong via
|
||||
junping_du)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-644. Basic null check is not performed on passed in arguments before
|
||||
|
|
|
@ -0,0 +1,163 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records.timeline;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import com.google.common.base.Splitter;
|
||||
|
||||
/**
|
||||
* <p><code>TimelineEntityGroupId</code> is an abstract way for
|
||||
* timeline service users to represent “a group of related timeline data.
|
||||
* For example, all entities that represents one data flow DAG execution
|
||||
* can be grouped into one timeline entity group. </p>
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public class TimelineEntityGroupId implements
|
||||
Comparable<TimelineEntityGroupId> {
|
||||
|
||||
private static final Splitter SPLITTER = Splitter.on('_').trimResults();
|
||||
|
||||
private ApplicationId applicationId;
|
||||
private String id;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public static final String TIMELINE_ENTITY_GROUPID_STR_PREFIX =
|
||||
"timelineEntityGroupId";
|
||||
|
||||
public TimelineEntityGroupId() {
|
||||
|
||||
}
|
||||
|
||||
public static TimelineEntityGroupId newInstance(ApplicationId applicationId,
|
||||
String id) {
|
||||
TimelineEntityGroupId timelineEntityGroupId =
|
||||
new TimelineEntityGroupId();
|
||||
timelineEntityGroupId.setApplicationId(applicationId);
|
||||
timelineEntityGroupId.setTimelineEntityGroupId(id);
|
||||
return timelineEntityGroupId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the <code>ApplicationId</code> of the
|
||||
* <code>TimelineEntityGroupId</code>.
|
||||
*
|
||||
* @return <code>ApplicationId</code> of the
|
||||
* <code>TimelineEntityGroupId</code>
|
||||
*/
|
||||
public ApplicationId getApplicationId() {
|
||||
return this.applicationId;
|
||||
}
|
||||
|
||||
public void setApplicationId(ApplicationId appID) {
|
||||
this.applicationId = appID;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the <code>timelineEntityGroupId</code>.
|
||||
*
|
||||
* @return <code>timelineEntityGroupId</code>
|
||||
*/
|
||||
public String getTimelineEntityGroupId() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
protected void setTimelineEntityGroupId(String timelineEntityGroupId) {
|
||||
this.id = timelineEntityGroupId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = getTimelineEntityGroupId().hashCode();
|
||||
result = 31 * result + getApplicationId().hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
TimelineEntityGroupId otherObject = (TimelineEntityGroupId) obj;
|
||||
if (!this.getApplicationId().equals(otherObject.getApplicationId())) {
|
||||
return false;
|
||||
}
|
||||
if (!this.getTimelineEntityGroupId().equals(
|
||||
otherObject.getTimelineEntityGroupId())) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(TimelineEntityGroupId other) {
|
||||
int compareAppIds =
|
||||
this.getApplicationId().compareTo(other.getApplicationId());
|
||||
if (compareAppIds == 0) {
|
||||
return this.getTimelineEntityGroupId().compareTo(
|
||||
other.getTimelineEntityGroupId());
|
||||
} else {
|
||||
return compareAppIds;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(TIMELINE_ENTITY_GROUPID_STR_PREFIX + "_");
|
||||
ApplicationId appId = getApplicationId();
|
||||
sb.append(appId.getClusterTimestamp()).append("_");
|
||||
sb.append(appId.getId()).append("_");
|
||||
sb.append(getTimelineEntityGroupId());
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public static TimelineEntityGroupId
|
||||
fromString(String timelineEntityGroupIdStr) {
|
||||
StringBuffer buf = new StringBuffer();
|
||||
Iterator<String> it = SPLITTER.split(timelineEntityGroupIdStr).iterator();
|
||||
if (!it.next().equals(TIMELINE_ENTITY_GROUPID_STR_PREFIX)) {
|
||||
throw new IllegalArgumentException(
|
||||
"Invalid TimelineEntityGroupId prefix: " + timelineEntityGroupIdStr);
|
||||
}
|
||||
ApplicationId appId =
|
||||
ApplicationId.newInstance(Long.parseLong(it.next()),
|
||||
Integer.parseInt(it.next()));
|
||||
buf.append(it.next());
|
||||
while (it.hasNext()) {
|
||||
buf.append("_");
|
||||
buf.append(it.next());
|
||||
}
|
||||
return TimelineEntityGroupId.newInstance(appId, buf.toString());
|
||||
}
|
||||
}
|
|
@ -1581,6 +1581,10 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String TIMELINE_SERVICE_UI_WEB_PATH_PREFIX =
|
||||
TIMELINE_SERVICE_PREFIX + "ui-web-path.";
|
||||
|
||||
/** Timeline client settings */
|
||||
public static final String TIMELINE_SERVICE_CLIENT_PREFIX =
|
||||
TIMELINE_SERVICE_PREFIX + "client.";
|
||||
|
||||
/**
|
||||
* Path to war file or static content directory for this UI
|
||||
* (For pluggable UIs).
|
||||
|
@ -1588,6 +1592,45 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String TIMELINE_SERVICE_UI_ON_DISK_PATH_PREFIX =
|
||||
TIMELINE_SERVICE_PREFIX + "ui-on-disk-path.";
|
||||
|
||||
/**
|
||||
* The setting for timeline service v1.5
|
||||
*/
|
||||
public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX =
|
||||
TIMELINE_SERVICE_PREFIX + "entity-group-fs-store.";
|
||||
|
||||
public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR =
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "active-dir";
|
||||
|
||||
public static final String
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT =
|
||||
"/tmp/entity-file-history/active";
|
||||
|
||||
public static final String
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy-spec";
|
||||
public static final String
|
||||
DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
|
||||
"2000, 500";
|
||||
|
||||
public static final String
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES =
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-entity-types";
|
||||
|
||||
public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS =
|
||||
TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs";
|
||||
public static final long
|
||||
TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS_DEFAULT = 10;
|
||||
|
||||
public static final String TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS =
|
||||
TIMELINE_SERVICE_CLIENT_PREFIX + "fd-clean-interval-secs";
|
||||
public static final long
|
||||
TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS_DEFAULT = 60;
|
||||
|
||||
public static final String TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS =
|
||||
TIMELINE_SERVICE_CLIENT_PREFIX + "fd-retain-secs";
|
||||
public static final long TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT =
|
||||
5*60;
|
||||
|
||||
// mark app-history related configs @Private as application history is going
|
||||
// to be integrated into the timeline service
|
||||
@Private
|
||||
|
@ -1628,8 +1671,8 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE =
|
||||
APPLICATION_HISTORY_PREFIX + "fs-history-store.compression-type";
|
||||
@Private
|
||||
public static final String DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE =
|
||||
"none";
|
||||
public static final String
|
||||
DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE = "none";
|
||||
|
||||
/** The setting that controls whether timeline service is enabled or not. */
|
||||
public static final String TIMELINE_SERVICE_ENABLED =
|
||||
|
@ -1678,7 +1721,7 @@ public class YarnConfiguration extends Configuration {
|
|||
APPLICATION_HISTORY_PREFIX + "max-applications";
|
||||
public static final long DEFAULT_APPLICATION_HISTORY_MAX_APPS = 10000;
|
||||
|
||||
/** Timeline service store class */
|
||||
/** Timeline service store class. */
|
||||
public static final String TIMELINE_SERVICE_STORE =
|
||||
TIMELINE_SERVICE_PREFIX + "store-class";
|
||||
|
||||
|
@ -1791,10 +1834,6 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final boolean
|
||||
TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED_DEFAULT = false;
|
||||
|
||||
/** Timeline client settings */
|
||||
public static final String TIMELINE_SERVICE_CLIENT_PREFIX =
|
||||
TIMELINE_SERVICE_PREFIX + "client.";
|
||||
|
||||
/** Timeline client call, max retries (-1 means no limit) */
|
||||
public static final String TIMELINE_SERVICE_CLIENT_MAX_RETRIES =
|
||||
TIMELINE_SERVICE_CLIENT_PREFIX + "max-retries";
|
||||
|
|
|
@ -26,8 +26,10 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
@ -78,6 +80,28 @@ public abstract class TimelineClient extends AbstractService {
|
|||
public abstract TimelinePutResponse putEntities(
|
||||
TimelineEntity... entities) throws IOException, YarnException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Send the information of a number of conceptual entities to the timeline
|
||||
* server. It is a blocking API. The method will not return until it gets the
|
||||
* response from the timeline server.
|
||||
*
|
||||
* This API is only for timeline service v1.5
|
||||
* </p>
|
||||
*
|
||||
* @param appAttemptId {@link ApplicationAttemptId}
|
||||
* @param groupId {@link TimelineEntityGroupId}
|
||||
* @param entities
|
||||
* the collection of {@link TimelineEntity}
|
||||
* @return the error information if the sent entities are not correctly stored
|
||||
* @throws IOException
|
||||
* @throws YarnException
|
||||
*/
|
||||
@Public
|
||||
public abstract TimelinePutResponse putEntities(
|
||||
ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
|
||||
TimelineEntity... entities) throws IOException, YarnException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Send the information of a domain to the timeline server. It is a
|
||||
|
@ -94,6 +118,25 @@ public abstract class TimelineClient extends AbstractService {
|
|||
public abstract void putDomain(
|
||||
TimelineDomain domain) throws IOException, YarnException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Send the information of a domain to the timeline server. It is a
|
||||
* blocking API. The method will not return until it gets the response from
|
||||
* the timeline server.
|
||||
*
|
||||
* This API is only for timeline service v1.5
|
||||
* </p>
|
||||
*
|
||||
* @param domain
|
||||
* an {@link TimelineDomain} object
|
||||
* @param appAttemptId {@link ApplicationAttemptId}
|
||||
* @throws IOException
|
||||
* @throws YarnException
|
||||
*/
|
||||
@Public
|
||||
public abstract void putDomain(ApplicationAttemptId appAttemptId,
|
||||
TimelineDomain domain) throws IOException, YarnException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Get a delegation token so as to be able to talk to the timeline server in a
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client.api.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
import com.sun.jersey.api.client.Client;
|
||||
|
||||
/**
|
||||
* A simple writer class for storing Timeline data into Leveldb store.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public class DirectTimelineWriter extends TimelineWriter{
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(DirectTimelineWriter.class);
|
||||
|
||||
public DirectTimelineWriter(UserGroupInformation authUgi,
|
||||
Client client, URI resURI) {
|
||||
super(authUgi, client, resURI);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
|
||||
TimelineEntityGroupId groupId, TimelineEntity... entities)
|
||||
throws IOException, YarnException {
|
||||
throw new IOException("Not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putDomain(ApplicationAttemptId appAttemptId,
|
||||
TimelineDomain domain) throws IOException, YarnException {
|
||||
throw new IOException("Not supported");
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,847 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client.api.impl;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.Flushable;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.codehaus.jackson.JsonFactory;
|
||||
import org.codehaus.jackson.JsonGenerator;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.map.SerializationConfig.Feature;
|
||||
import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
|
||||
import org.codehaus.jackson.util.MinimalPrettyPrinter;
|
||||
import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
|
||||
|
||||
import com.sun.jersey.api.client.Client;
|
||||
|
||||
/**
|
||||
* A simple writer class for storing Timeline data in any storage that
|
||||
* implements a basic FileSystem interface.
|
||||
* This writer is used for ATSv1.5.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public class FileSystemTimelineWriter extends TimelineWriter{
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(FileSystemTimelineWriter.class);
|
||||
|
||||
// This is temporary solution. The configuration will be deleted once we have
|
||||
// the FileSystem API to check whether append operation is supported or not.
|
||||
private static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
|
||||
= YarnConfiguration.TIMELINE_SERVICE_PREFIX
|
||||
+ "entity-file.fs-support-append";
|
||||
|
||||
// App log directory must be readable by group so server can access logs
|
||||
// and writable by group so it can be deleted by server
|
||||
private static final short APP_LOG_DIR_PERMISSIONS = 0770;
|
||||
// Logs must be readable by group so server can access them
|
||||
private static final short FILE_LOG_PERMISSIONS = 0640;
|
||||
private static final String DOMAIN_LOG_PREFIX = "domainlog-";
|
||||
private static final String SUMMARY_LOG_PREFIX = "summarylog-";
|
||||
private static final String ENTITY_LOG_PREFIX = "entitylog-";
|
||||
|
||||
private Path activePath = null;
|
||||
private FileSystem fs = null;
|
||||
private Set<String> summaryEntityTypes;
|
||||
private ObjectMapper objMapper = null;
|
||||
private long flushIntervalSecs;
|
||||
private long cleanIntervalSecs;
|
||||
private long ttl;
|
||||
private LogFDsCache logFDsCache = null;
|
||||
private boolean isAppendSupported;
|
||||
|
||||
public FileSystemTimelineWriter(Configuration conf,
|
||||
UserGroupInformation authUgi, Client client, URI resURI)
|
||||
throws IOException {
|
||||
super(authUgi, client, resURI);
|
||||
|
||||
Configuration fsConf = new Configuration(conf);
|
||||
fsConf.setBoolean("dfs.client.retry.policy.enabled", true);
|
||||
String retryPolicy =
|
||||
fsConf.get(YarnConfiguration.
|
||||
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC,
|
||||
YarnConfiguration.
|
||||
DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC);
|
||||
fsConf.set("dfs.client.retry.policy.spec", retryPolicy);
|
||||
|
||||
activePath = new Path(fsConf.get(
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
|
||||
|
||||
String scheme = activePath.toUri().getScheme();
|
||||
if (scheme == null) {
|
||||
scheme = FileSystem.getDefaultUri(fsConf).getScheme();
|
||||
}
|
||||
if (scheme != null) {
|
||||
String disableCacheName = String.format("fs.%s.impl.disable.cache",
|
||||
scheme);
|
||||
fsConf.setBoolean(disableCacheName, true);
|
||||
}
|
||||
|
||||
fs = activePath.getFileSystem(fsConf);
|
||||
if (!fs.exists(activePath)) {
|
||||
throw new IOException(activePath + " does not exist");
|
||||
}
|
||||
|
||||
summaryEntityTypes = new HashSet<String>(
|
||||
conf.getStringCollection(YarnConfiguration
|
||||
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES));
|
||||
|
||||
flushIntervalSecs = conf.getLong(
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS,
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS_DEFAULT);
|
||||
|
||||
cleanIntervalSecs = conf.getLong(
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS,
|
||||
YarnConfiguration
|
||||
.TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS_DEFAULT);
|
||||
|
||||
ttl = conf.getLong(
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS,
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT);
|
||||
|
||||
logFDsCache =
|
||||
new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl);
|
||||
|
||||
this.isAppendSupported =
|
||||
conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
|
||||
|
||||
objMapper = createObjectMapper();
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
StringBuilder debugMSG = new StringBuilder();
|
||||
debugMSG.append(
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS
|
||||
+ "=" + flushIntervalSecs + ", " +
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS
|
||||
+ "=" + cleanIntervalSecs + ", " +
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS
|
||||
+ "=" + ttl + ", " +
|
||||
TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
|
||||
+ "=" + isAppendSupported + ", " +
|
||||
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR
|
||||
+ "=" + activePath);
|
||||
|
||||
if (summaryEntityTypes != null && !summaryEntityTypes.isEmpty()) {
|
||||
debugMSG.append(", " + YarnConfiguration
|
||||
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES
|
||||
+ " = " + summaryEntityTypes);
|
||||
}
|
||||
LOG.debug(debugMSG.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelinePutResponse putEntities(
|
||||
ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
|
||||
TimelineEntity... entities) throws IOException, YarnException {
|
||||
if (appAttemptId == null) {
|
||||
return putEntities(entities);
|
||||
}
|
||||
|
||||
List<TimelineEntity> entitiesToDBStore = new ArrayList<TimelineEntity>();
|
||||
List<TimelineEntity> entitiesToSummaryCache
|
||||
= new ArrayList<TimelineEntity>();
|
||||
List<TimelineEntity> entitiesToEntityCache
|
||||
= new ArrayList<TimelineEntity>();
|
||||
Path attemptDir = createAttemptDir(appAttemptId);
|
||||
|
||||
for (TimelineEntity entity : entities) {
|
||||
if (summaryEntityTypes.contains(entity.getEntityType())) {
|
||||
entitiesToSummaryCache.add(entity);
|
||||
} else {
|
||||
if (groupId != null) {
|
||||
entitiesToEntityCache.add(entity);
|
||||
} else {
|
||||
entitiesToDBStore.add(entity);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!entitiesToSummaryCache.isEmpty()) {
|
||||
Path summaryLogPath =
|
||||
new Path(attemptDir, SUMMARY_LOG_PREFIX + appAttemptId.toString());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Writing summary log for " + appAttemptId.toString() + " to "
|
||||
+ summaryLogPath);
|
||||
}
|
||||
this.logFDsCache.writeSummaryEntityLogs(fs, summaryLogPath, objMapper,
|
||||
appAttemptId, entitiesToSummaryCache, isAppendSupported);
|
||||
}
|
||||
|
||||
if (!entitiesToEntityCache.isEmpty()) {
|
||||
Path entityLogPath =
|
||||
new Path(attemptDir, ENTITY_LOG_PREFIX + groupId.toString());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Writing entity log for " + groupId.toString() + " to "
|
||||
+ entityLogPath);
|
||||
}
|
||||
this.logFDsCache.writeEntityLogs(fs, entityLogPath, objMapper,
|
||||
appAttemptId, groupId, entitiesToEntityCache, isAppendSupported);
|
||||
}
|
||||
|
||||
if (!entitiesToDBStore.isEmpty()) {
|
||||
putEntities(entitiesToDBStore.toArray(
|
||||
new TimelineEntity[entitiesToDBStore.size()]));
|
||||
}
|
||||
|
||||
return new TimelinePutResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putDomain(ApplicationAttemptId appAttemptId,
|
||||
TimelineDomain domain) throws IOException, YarnException {
|
||||
if (appAttemptId == null) {
|
||||
putDomain(domain);
|
||||
} else {
|
||||
writeDomain(appAttemptId, domain);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
if (this.logFDsCache != null) {
|
||||
this.logFDsCache.close();
|
||||
}
|
||||
}
|
||||
|
||||
private ObjectMapper createObjectMapper() {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector());
|
||||
mapper.setSerializationInclusion(Inclusion.NON_NULL);
|
||||
mapper.configure(Feature.CLOSE_CLOSEABLE, false);
|
||||
return mapper;
|
||||
}
|
||||
|
||||
private Path createAttemptDir(ApplicationAttemptId appAttemptId)
|
||||
throws IOException {
|
||||
Path appDir = createApplicationDir(appAttemptId.getApplicationId());
|
||||
|
||||
Path attemptDir = new Path(appDir, appAttemptId.toString());
|
||||
if (!fs.exists(attemptDir)) {
|
||||
FileSystem.mkdirs(fs, attemptDir, new FsPermission(
|
||||
APP_LOG_DIR_PERMISSIONS));
|
||||
}
|
||||
return attemptDir;
|
||||
}
|
||||
|
||||
private Path createApplicationDir(ApplicationId appId) throws IOException {
|
||||
Path appDir =
|
||||
new Path(activePath, appId.toString());
|
||||
if (!fs.exists(appDir)) {
|
||||
FileSystem.mkdirs(fs, appDir, new FsPermission(APP_LOG_DIR_PERMISSIONS));
|
||||
}
|
||||
return appDir;
|
||||
}
|
||||
|
||||
private void writeDomain(ApplicationAttemptId appAttemptId,
|
||||
TimelineDomain domain) throws IOException {
|
||||
Path domainLogPath =
|
||||
new Path(createAttemptDir(appAttemptId), DOMAIN_LOG_PREFIX
|
||||
+ appAttemptId.toString());
|
||||
LOG.info("Writing domains for " + appAttemptId.toString() + " to "
|
||||
+ domainLogPath);
|
||||
this.logFDsCache.writeDomainLog(
|
||||
fs, domainLogPath, objMapper, domain, isAppendSupported);
|
||||
}
|
||||
|
||||
private static class DomainLogFD extends LogFD {
|
||||
public DomainLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper,
|
||||
boolean isAppendSupported) throws IOException {
|
||||
super(fs, logPath, objMapper, isAppendSupported);
|
||||
}
|
||||
|
||||
public void writeDomain(TimelineDomain domain)
|
||||
throws IOException {
|
||||
getObjectMapper().writeValue(getJsonGenerator(), domain);
|
||||
updateLastModifiedTime(System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
|
||||
private static class EntityLogFD extends LogFD {
|
||||
public EntityLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper,
|
||||
boolean isAppendSupported) throws IOException {
|
||||
super(fs, logPath, objMapper, isAppendSupported);
|
||||
}
|
||||
|
||||
public void writeEntities(List<TimelineEntity> entities)
|
||||
throws IOException {
|
||||
if (writerClosed()) {
|
||||
prepareForWrite();
|
||||
}
|
||||
for (TimelineEntity entity : entities) {
|
||||
getObjectMapper().writeValue(getJsonGenerator(), entity);
|
||||
}
|
||||
updateLastModifiedTime(System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
|
||||
private static class LogFD {
|
||||
private FSDataOutputStream stream;
|
||||
private ObjectMapper objMapper;
|
||||
private JsonGenerator jsonGenerator;
|
||||
private long lastModifiedTime;
|
||||
private final boolean isAppendSupported;
|
||||
private final ReentrantLock fdLock = new ReentrantLock();
|
||||
private final FileSystem fs;
|
||||
private final Path logPath;
|
||||
|
||||
public LogFD(FileSystem fs, Path logPath, ObjectMapper objMapper,
|
||||
boolean isAppendSupported) throws IOException {
|
||||
this.fs = fs;
|
||||
this.logPath = logPath;
|
||||
this.isAppendSupported = isAppendSupported;
|
||||
this.objMapper = objMapper;
|
||||
prepareForWrite();
|
||||
}
|
||||
|
||||
public void close() {
|
||||
if (stream != null) {
|
||||
IOUtils.cleanup(LOG, jsonGenerator);
|
||||
IOUtils.cleanup(LOG, stream);
|
||||
stream = null;
|
||||
jsonGenerator = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void flush() throws IOException {
|
||||
if (stream != null) {
|
||||
stream.hflush();
|
||||
}
|
||||
}
|
||||
|
||||
public long getLastModifiedTime() {
|
||||
return this.lastModifiedTime;
|
||||
}
|
||||
|
||||
protected void prepareForWrite() throws IOException{
|
||||
this.stream = createLogFileStream(fs, logPath);
|
||||
this.jsonGenerator = new JsonFactory().createJsonGenerator(stream);
|
||||
this.jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
|
||||
this.lastModifiedTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
protected boolean writerClosed() {
|
||||
return stream == null;
|
||||
}
|
||||
|
||||
private FSDataOutputStream createLogFileStream(FileSystem fileSystem,
|
||||
Path logPathToCreate)
|
||||
throws IOException {
|
||||
FSDataOutputStream streamToCreate;
|
||||
if (!isAppendSupported) {
|
||||
logPathToCreate =
|
||||
new Path(logPathToCreate.getParent(),
|
||||
(logPathToCreate.getName() + "_" + System.currentTimeMillis()));
|
||||
}
|
||||
if (!fileSystem.exists(logPathToCreate)) {
|
||||
streamToCreate = fileSystem.create(logPathToCreate, false);
|
||||
fileSystem.setPermission(logPathToCreate,
|
||||
new FsPermission(FILE_LOG_PERMISSIONS));
|
||||
} else {
|
||||
streamToCreate = fileSystem.append(logPathToCreate);
|
||||
}
|
||||
return streamToCreate;
|
||||
}
|
||||
|
||||
public void lock() {
|
||||
this.fdLock.lock();
|
||||
}
|
||||
|
||||
public void unlock() {
|
||||
this.fdLock.unlock();
|
||||
}
|
||||
|
||||
protected JsonGenerator getJsonGenerator() {
|
||||
return jsonGenerator;
|
||||
}
|
||||
|
||||
protected ObjectMapper getObjectMapper() {
|
||||
return objMapper;
|
||||
}
|
||||
|
||||
protected void updateLastModifiedTime(long updatedTime) {
|
||||
this.lastModifiedTime = updatedTime;
|
||||
}
|
||||
}
|
||||
|
||||
private static class LogFDsCache implements Closeable, Flushable{
|
||||
private DomainLogFD domainLogFD;
|
||||
private Map<ApplicationAttemptId, EntityLogFD> summanyLogFDs;
|
||||
private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
|
||||
EntityLogFD>> entityLogFDs;
|
||||
private Timer flushTimer;
|
||||
private FlushTimerTask flushTimerTask;
|
||||
private Timer cleanInActiveFDsTimer;
|
||||
private CleanInActiveFDsTask cleanInActiveFDsTask;
|
||||
private final long ttl;
|
||||
private final ReentrantLock domainFDLocker = new ReentrantLock();
|
||||
private final ReentrantLock summaryTableLocker = new ReentrantLock();
|
||||
private final ReentrantLock entityTableLocker = new ReentrantLock();
|
||||
private final ReentrantLock summaryTableCopyLocker = new ReentrantLock();
|
||||
private final ReentrantLock entityTableCopyLocker = new ReentrantLock();
|
||||
private volatile boolean serviceStopped = false;
|
||||
|
||||
public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs,
|
||||
long ttl) {
|
||||
domainLogFD = null;
|
||||
summanyLogFDs = new HashMap<ApplicationAttemptId, EntityLogFD>();
|
||||
entityLogFDs = new HashMap<ApplicationAttemptId,
|
||||
HashMap<TimelineEntityGroupId, EntityLogFD>>();
|
||||
this.flushTimer =
|
||||
new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer",
|
||||
true);
|
||||
this.flushTimerTask = new FlushTimerTask();
|
||||
this.flushTimer.schedule(flushTimerTask, flushIntervalSecs * 1000,
|
||||
flushIntervalSecs * 1000);
|
||||
|
||||
this.cleanInActiveFDsTimer =
|
||||
new Timer(LogFDsCache.class.getSimpleName() +
|
||||
"cleanInActiveFDsTimer", true);
|
||||
this.cleanInActiveFDsTask = new CleanInActiveFDsTask();
|
||||
this.cleanInActiveFDsTimer.schedule(cleanInActiveFDsTask,
|
||||
cleanIntervalSecs * 1000, cleanIntervalSecs * 1000);
|
||||
this.ttl = ttl * 1000;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException {
|
||||
try {
|
||||
this.domainFDLocker.lock();
|
||||
if (domainLogFD != null) {
|
||||
domainLogFD.flush();
|
||||
}
|
||||
} finally {
|
||||
this.domainFDLocker.unlock();
|
||||
}
|
||||
|
||||
flushSummaryFDMap(copySummaryLogFDs(summanyLogFDs));
|
||||
|
||||
flushEntityFDMap(copyEntityLogFDs(entityLogFDs));
|
||||
}
|
||||
|
||||
private Map<ApplicationAttemptId, EntityLogFD> copySummaryLogFDs(
|
||||
Map<ApplicationAttemptId, EntityLogFD> summanyLogFDsToCopy) {
|
||||
try {
|
||||
summaryTableCopyLocker.lock();
|
||||
return new HashMap<ApplicationAttemptId, EntityLogFD>(
|
||||
summanyLogFDsToCopy);
|
||||
} finally {
|
||||
summaryTableCopyLocker.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
|
||||
EntityLogFD>> copyEntityLogFDs(Map<ApplicationAttemptId,
|
||||
HashMap<TimelineEntityGroupId, EntityLogFD>> entityLogFDsToCopy) {
|
||||
try {
|
||||
entityTableCopyLocker.lock();
|
||||
return new HashMap<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
|
||||
EntityLogFD>>(entityLogFDsToCopy);
|
||||
} finally {
|
||||
entityTableCopyLocker.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void flushSummaryFDMap(Map<ApplicationAttemptId,
|
||||
EntityLogFD> logFDs) throws IOException {
|
||||
if (!logFDs.isEmpty()) {
|
||||
for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs
|
||||
.entrySet()) {
|
||||
EntityLogFD logFD = logFDEntry.getValue();
|
||||
try {
|
||||
logFD.lock();
|
||||
logFD.flush();
|
||||
} finally {
|
||||
logFD.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void flushEntityFDMap(Map<ApplicationAttemptId, HashMap<
|
||||
TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException {
|
||||
if (!logFDs.isEmpty()) {
|
||||
for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
|
||||
EntityLogFD>> logFDMapEntry : logFDs.entrySet()) {
|
||||
HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap
|
||||
= logFDMapEntry.getValue();
|
||||
for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
|
||||
: logFDMap.entrySet()) {
|
||||
EntityLogFD logFD = logFDEntry.getValue();
|
||||
try {
|
||||
logFD.lock();
|
||||
logFD.flush();
|
||||
} finally {
|
||||
logFD.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class FlushTimerTask extends TimerTask {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
flush();
|
||||
} catch (Exception e) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanInActiveFDs() {
|
||||
long currentTimeStamp = System.currentTimeMillis();
|
||||
try {
|
||||
this.domainFDLocker.lock();
|
||||
if (domainLogFD != null) {
|
||||
if (currentTimeStamp - domainLogFD.getLastModifiedTime() >= ttl) {
|
||||
domainLogFD.close();
|
||||
domainLogFD = null;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.domainFDLocker.unlock();
|
||||
}
|
||||
|
||||
cleanInActiveSummaryFDsforMap(copySummaryLogFDs(summanyLogFDs),
|
||||
currentTimeStamp);
|
||||
|
||||
cleanInActiveEntityFDsforMap(copyEntityLogFDs(entityLogFDs),
|
||||
currentTimeStamp);
|
||||
}
|
||||
|
||||
private void cleanInActiveSummaryFDsforMap(
|
||||
Map<ApplicationAttemptId, EntityLogFD> logFDs,
|
||||
long currentTimeStamp) {
|
||||
if (!logFDs.isEmpty()) {
|
||||
for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs
|
||||
.entrySet()) {
|
||||
EntityLogFD logFD = logFDEntry.getValue();
|
||||
try {
|
||||
logFD.lock();
|
||||
if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
|
||||
logFD.close();
|
||||
}
|
||||
} finally {
|
||||
logFD.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanInActiveEntityFDsforMap(Map<ApplicationAttemptId,
|
||||
HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs,
|
||||
long currentTimeStamp) {
|
||||
if (!logFDs.isEmpty()) {
|
||||
for (Entry<ApplicationAttemptId, HashMap<
|
||||
TimelineEntityGroupId, EntityLogFD>> logFDMapEntry
|
||||
: logFDs.entrySet()) {
|
||||
HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap
|
||||
= logFDMapEntry.getValue();
|
||||
for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
|
||||
: logFDMap.entrySet()) {
|
||||
EntityLogFD logFD = logFDEntry.getValue();
|
||||
try {
|
||||
logFD.lock();
|
||||
if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
|
||||
logFD.close();
|
||||
}
|
||||
} finally {
|
||||
logFD.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class CleanInActiveFDsTask extends TimerTask {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
cleanInActiveFDs();
|
||||
} catch (Exception e) {
|
||||
LOG.warn(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
||||
serviceStopped = true;
|
||||
|
||||
flushTimer.cancel();
|
||||
cleanInActiveFDsTimer.cancel();
|
||||
|
||||
try {
|
||||
this.domainFDLocker.lock();
|
||||
if (domainLogFD != null) {
|
||||
domainLogFD.close();
|
||||
domainLogFD = null;
|
||||
}
|
||||
} finally {
|
||||
this.domainFDLocker.unlock();
|
||||
}
|
||||
|
||||
closeSummaryFDs(summanyLogFDs);
|
||||
|
||||
closeEntityFDs(entityLogFDs);
|
||||
}
|
||||
|
||||
private void closeEntityFDs(Map<ApplicationAttemptId,
|
||||
HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs) {
|
||||
try {
|
||||
entityTableLocker.lock();
|
||||
if (!logFDs.isEmpty()) {
|
||||
for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
|
||||
EntityLogFD>> logFDMapEntry : logFDs.entrySet()) {
|
||||
HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap
|
||||
= logFDMapEntry.getValue();
|
||||
for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
|
||||
: logFDMap.entrySet()) {
|
||||
EntityLogFD logFD = logFDEntry.getValue();
|
||||
try {
|
||||
logFD.lock();
|
||||
logFD.close();
|
||||
} finally {
|
||||
logFD.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
entityTableLocker.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void closeSummaryFDs(
|
||||
Map<ApplicationAttemptId, EntityLogFD> logFDs) {
|
||||
try {
|
||||
summaryTableLocker.lock();
|
||||
if (!logFDs.isEmpty()) {
|
||||
for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry
|
||||
: logFDs.entrySet()) {
|
||||
EntityLogFD logFD = logFDEntry.getValue();
|
||||
try {
|
||||
logFD.lock();
|
||||
logFD.close();
|
||||
} finally {
|
||||
logFD.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
summaryTableLocker.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void writeDomainLog(FileSystem fs, Path logPath,
|
||||
ObjectMapper objMapper, TimelineDomain domain,
|
||||
boolean isAppendSupported) throws IOException {
|
||||
try {
|
||||
this.domainFDLocker.lock();
|
||||
if (this.domainLogFD != null) {
|
||||
this.domainLogFD.writeDomain(domain);
|
||||
} else {
|
||||
this.domainLogFD =
|
||||
new DomainLogFD(fs, logPath, objMapper, isAppendSupported);
|
||||
this.domainLogFD.writeDomain(domain);
|
||||
}
|
||||
} finally {
|
||||
this.domainFDLocker.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void writeEntityLogs(FileSystem fs, Path entityLogPath,
|
||||
ObjectMapper objMapper, ApplicationAttemptId appAttemptId,
|
||||
TimelineEntityGroupId groupId, List<TimelineEntity> entitiesToEntity,
|
||||
boolean isAppendSupported) throws IOException{
|
||||
writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId,
|
||||
groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs);
|
||||
}
|
||||
|
||||
private void writeEntityLogs(FileSystem fs, Path logPath,
|
||||
ObjectMapper objMapper, ApplicationAttemptId attemptId,
|
||||
TimelineEntityGroupId groupId, List<TimelineEntity> entities,
|
||||
boolean isAppendSupported, Map<ApplicationAttemptId, HashMap<
|
||||
TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException {
|
||||
HashMap<TimelineEntityGroupId, EntityLogFD>logMapFD
|
||||
= logFDs.get(attemptId);
|
||||
if (logMapFD != null) {
|
||||
EntityLogFD logFD = logMapFD.get(groupId);
|
||||
if (logFD != null) {
|
||||
try {
|
||||
logFD.lock();
|
||||
if (serviceStopped) {
|
||||
return;
|
||||
}
|
||||
logFD.writeEntities(entities);
|
||||
} finally {
|
||||
logFD.unlock();
|
||||
}
|
||||
} else {
|
||||
createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId,
|
||||
entities, isAppendSupported, logFDs);
|
||||
}
|
||||
} else {
|
||||
createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId,
|
||||
entities, isAppendSupported, logFDs);
|
||||
}
|
||||
}
|
||||
|
||||
private void createEntityFDandWrite(FileSystem fs, Path logPath,
|
||||
ObjectMapper objMapper, ApplicationAttemptId attemptId,
|
||||
TimelineEntityGroupId groupId, List<TimelineEntity> entities,
|
||||
boolean isAppendSupported, Map<ApplicationAttemptId, HashMap<
|
||||
TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException{
|
||||
try {
|
||||
entityTableLocker.lock();
|
||||
if (serviceStopped) {
|
||||
return;
|
||||
}
|
||||
HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap =
|
||||
logFDs.get(attemptId);
|
||||
if (logFDMap == null) {
|
||||
logFDMap = new HashMap<TimelineEntityGroupId, EntityLogFD>();
|
||||
}
|
||||
EntityLogFD logFD = logFDMap.get(groupId);
|
||||
if (logFD == null) {
|
||||
logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
|
||||
}
|
||||
try {
|
||||
logFD.lock();
|
||||
logFD.writeEntities(entities);
|
||||
try {
|
||||
entityTableCopyLocker.lock();
|
||||
logFDMap.put(groupId, logFD);
|
||||
logFDs.put(attemptId, logFDMap);
|
||||
} finally {
|
||||
entityTableCopyLocker.unlock();
|
||||
}
|
||||
} finally {
|
||||
logFD.unlock();
|
||||
}
|
||||
} finally {
|
||||
entityTableLocker.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void writeSummaryEntityLogs(FileSystem fs, Path logPath,
|
||||
ObjectMapper objMapper, ApplicationAttemptId attemptId,
|
||||
List<TimelineEntity> entities, boolean isAppendSupported)
|
||||
throws IOException {
|
||||
writeSummmaryEntityLogs(fs, logPath, objMapper, attemptId, entities,
|
||||
isAppendSupported, this.summanyLogFDs);
|
||||
}
|
||||
|
||||
private void writeSummmaryEntityLogs(FileSystem fs, Path logPath,
|
||||
ObjectMapper objMapper, ApplicationAttemptId attemptId,
|
||||
List<TimelineEntity> entities, boolean isAppendSupported,
|
||||
Map<ApplicationAttemptId, EntityLogFD> logFDs) throws IOException {
|
||||
EntityLogFD logFD = null;
|
||||
logFD = logFDs.get(attemptId);
|
||||
if (logFD != null) {
|
||||
try {
|
||||
logFD.lock();
|
||||
if (serviceStopped) {
|
||||
return;
|
||||
}
|
||||
logFD.writeEntities(entities);
|
||||
} finally {
|
||||
logFD.unlock();
|
||||
}
|
||||
} else {
|
||||
createSummaryFDAndWrite(fs, logPath, objMapper, attemptId, entities,
|
||||
isAppendSupported, logFDs);
|
||||
}
|
||||
}
|
||||
|
||||
private void createSummaryFDAndWrite(FileSystem fs, Path logPath,
|
||||
ObjectMapper objMapper, ApplicationAttemptId attemptId,
|
||||
List<TimelineEntity> entities, boolean isAppendSupported,
|
||||
Map<ApplicationAttemptId, EntityLogFD> logFDs) throws IOException {
|
||||
try {
|
||||
summaryTableLocker.lock();
|
||||
if (serviceStopped) {
|
||||
return;
|
||||
}
|
||||
EntityLogFD logFD = logFDs.get(attemptId);
|
||||
if (logFD == null) {
|
||||
logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
|
||||
}
|
||||
try {
|
||||
logFD.lock();
|
||||
logFD.writeEntities(entities);
|
||||
try {
|
||||
summaryTableCopyLocker.lock();
|
||||
logFDs.put(attemptId, logFD);
|
||||
} finally {
|
||||
summaryTableCopyLocker.unlock();
|
||||
}
|
||||
} finally {
|
||||
logFD.unlock();
|
||||
}
|
||||
} finally {
|
||||
summaryTableLocker.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -33,8 +33,6 @@ import java.security.PrivilegedExceptionAction;
|
|||
import javax.net.ssl.HostnameVerifier;
|
||||
import javax.net.ssl.HttpsURLConnection;
|
||||
import javax.net.ssl.SSLSocketFactory;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.GnuParser;
|
||||
import org.apache.commons.cli.HelpFormatter;
|
||||
|
@ -54,19 +52,19 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthentica
|
|||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
|
||||
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
|
||||
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -74,7 +72,6 @@ import com.sun.jersey.api.client.Client;
|
|||
import com.sun.jersey.api.client.ClientHandlerException;
|
||||
import com.sun.jersey.api.client.ClientRequest;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.api.client.WebResource;
|
||||
import com.sun.jersey.api.client.config.ClientConfig;
|
||||
import com.sun.jersey.api.client.config.DefaultClientConfig;
|
||||
import com.sun.jersey.api.client.filter.ClientFilter;
|
||||
|
@ -110,6 +107,9 @@ public class TimelineClientImpl extends TimelineClient {
|
|||
private URI resURI;
|
||||
private UserGroupInformation authUgi;
|
||||
private String doAsUser;
|
||||
private Configuration configuration;
|
||||
private float timelineServiceVersion;
|
||||
private TimelineWriter timelineWriter;
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
|
@ -254,6 +254,7 @@ public class TimelineClientImpl extends TimelineClient {
|
|||
}
|
||||
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
this.configuration = conf;
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
UserGroupInformation realUgi = ugi.getRealUser();
|
||||
if (realUgi != null) {
|
||||
|
@ -293,58 +294,48 @@ public class TimelineClientImpl extends TimelineClient {
|
|||
RESOURCE_URI_STR));
|
||||
}
|
||||
LOG.info("Timeline service address: " + resURI);
|
||||
timelineServiceVersion =
|
||||
conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
timelineWriter = createTimelineWriter(
|
||||
configuration, authUgi, client, resURI);
|
||||
}
|
||||
|
||||
protected TimelineWriter createTimelineWriter(Configuration conf,
|
||||
UserGroupInformation ugi, Client webClient, URI uri)
|
||||
throws IOException {
|
||||
if (Float.compare(this.timelineServiceVersion, 1.5f) == 0) {
|
||||
return new FileSystemTimelineWriter(
|
||||
conf, ugi, webClient, uri);
|
||||
} else {
|
||||
return new DirectTimelineWriter(ugi, webClient, uri);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
if (this.timelineWriter != null) {
|
||||
this.timelineWriter.close();
|
||||
}
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelinePutResponse putEntities(
|
||||
TimelineEntity... entities) throws IOException, YarnException {
|
||||
TimelineEntities entitiesContainer = new TimelineEntities();
|
||||
for (TimelineEntity entity : entities) {
|
||||
if (entity.getEntityId() == null || entity.getEntityType() == null) {
|
||||
throw new YarnException("Incomplete entity without entity id/type");
|
||||
}
|
||||
entitiesContainer.addEntity(entity);
|
||||
}
|
||||
ClientResponse resp = doPosting(entitiesContainer, null);
|
||||
return resp.getEntity(TimelinePutResponse.class);
|
||||
return timelineWriter.putEntities(entities);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void putDomain(TimelineDomain domain) throws IOException,
|
||||
YarnException {
|
||||
doPosting(domain, "domain");
|
||||
}
|
||||
|
||||
private ClientResponse doPosting(final Object obj, final String path)
|
||||
throws IOException, YarnException {
|
||||
ClientResponse resp;
|
||||
try {
|
||||
resp = authUgi.doAs(new PrivilegedExceptionAction<ClientResponse>() {
|
||||
@Override
|
||||
public ClientResponse run() throws Exception {
|
||||
return doPostingObject(obj, path);
|
||||
}
|
||||
});
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
throw new IOException(e.getCause());
|
||||
} catch (InterruptedException ie) {
|
||||
throw new IOException(ie);
|
||||
}
|
||||
if (resp == null ||
|
||||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
|
||||
String msg =
|
||||
"Failed to get the response from the timeline server.";
|
||||
LOG.error(msg);
|
||||
if (LOG.isDebugEnabled() && resp != null) {
|
||||
String output = resp.getEntity(String.class);
|
||||
LOG.debug("HTTP error code: " + resp.getStatus()
|
||||
+ " Server response : \n" + output);
|
||||
}
|
||||
throw new YarnException(msg);
|
||||
}
|
||||
return resp;
|
||||
timelineWriter.putDomain(domain);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -470,23 +461,6 @@ public class TimelineClientImpl extends TimelineClient {
|
|||
return connectionRetry.retryOn(tokenRetryOp);
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public ClientResponse doPostingObject(Object object, String path) {
|
||||
WebResource webResource = client.resource(resURI);
|
||||
if (path == null) {
|
||||
return webResource.accept(MediaType.APPLICATION_JSON)
|
||||
.type(MediaType.APPLICATION_JSON)
|
||||
.post(ClientResponse.class, object);
|
||||
} else if (path.equals("domain")) {
|
||||
return webResource.path(path).accept(MediaType.APPLICATION_JSON)
|
||||
.type(MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class, object);
|
||||
} else {
|
||||
throw new YarnRuntimeException("Unknown resource type");
|
||||
}
|
||||
}
|
||||
|
||||
private class TimelineURLConnectionFactory
|
||||
implements HttpURLConnectionFactory {
|
||||
|
||||
|
@ -663,4 +637,34 @@ public class TimelineClientImpl extends TimelineClient {
|
|||
public UserGroupInformation getUgi() {
|
||||
return authUgi;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
|
||||
TimelineEntityGroupId groupId, TimelineEntity... entities)
|
||||
throws IOException, YarnException {
|
||||
if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
|
||||
throw new YarnException(
|
||||
"This API is not supported under current Timeline Service Version: "
|
||||
+ timelineServiceVersion);
|
||||
}
|
||||
|
||||
return timelineWriter.putEntities(appAttemptId, groupId, entities);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putDomain(ApplicationAttemptId appAttemptId,
|
||||
TimelineDomain domain) throws IOException, YarnException {
|
||||
if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
|
||||
throw new YarnException(
|
||||
"This API is not supported under current Timeline Service Version: "
|
||||
+ timelineServiceVersion);
|
||||
}
|
||||
timelineWriter.putDomain(appAttemptId, domain);
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public void setTimelineWriter(TimelineWriter writer) {
|
||||
this.timelineWriter = writer;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,142 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client.api.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.net.URI;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.sun.jersey.api.client.Client;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.api.client.WebResource;
|
||||
|
||||
/**
|
||||
* Base writer class to write the Timeline data.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract class TimelineWriter {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(TimelineWriter.class);
|
||||
|
||||
private UserGroupInformation authUgi;
|
||||
private Client client;
|
||||
private URI resURI;
|
||||
|
||||
public TimelineWriter(UserGroupInformation authUgi, Client client,
|
||||
URI resURI) {
|
||||
this.authUgi = authUgi;
|
||||
this.client = client;
|
||||
this.resURI = resURI;
|
||||
}
|
||||
|
||||
public void close() throws Exception {
|
||||
// DO NOTHING
|
||||
}
|
||||
|
||||
public TimelinePutResponse putEntities(
|
||||
TimelineEntity... entities) throws IOException, YarnException {
|
||||
TimelineEntities entitiesContainer = new TimelineEntities();
|
||||
for (TimelineEntity entity : entities) {
|
||||
if (entity.getEntityId() == null || entity.getEntityType() == null) {
|
||||
throw new YarnException("Incomplete entity without entity id/type");
|
||||
}
|
||||
entitiesContainer.addEntity(entity);
|
||||
}
|
||||
ClientResponse resp = doPosting(entitiesContainer, null);
|
||||
return resp.getEntity(TimelinePutResponse.class);
|
||||
}
|
||||
|
||||
public void putDomain(TimelineDomain domain) throws IOException,
|
||||
YarnException {
|
||||
doPosting(domain, "domain");
|
||||
}
|
||||
|
||||
public abstract TimelinePutResponse putEntities(
|
||||
ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
|
||||
TimelineEntity... entities) throws IOException, YarnException;
|
||||
|
||||
public abstract void putDomain(ApplicationAttemptId appAttemptId,
|
||||
TimelineDomain domain) throws IOException, YarnException;
|
||||
|
||||
private ClientResponse doPosting(final Object obj, final String path)
|
||||
throws IOException, YarnException {
|
||||
ClientResponse resp;
|
||||
try {
|
||||
resp = authUgi.doAs(new PrivilegedExceptionAction<ClientResponse>() {
|
||||
@Override
|
||||
public ClientResponse run() throws Exception {
|
||||
return doPostingObject(obj, path);
|
||||
}
|
||||
});
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
throw new IOException(e.getCause());
|
||||
} catch (InterruptedException ie) {
|
||||
throw new IOException(ie);
|
||||
}
|
||||
if (resp == null ||
|
||||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
|
||||
String msg =
|
||||
"Failed to get the response from the timeline server.";
|
||||
LOG.error(msg);
|
||||
if (LOG.isDebugEnabled() && resp != null) {
|
||||
String output = resp.getEntity(String.class);
|
||||
LOG.debug("HTTP error code: " + resp.getStatus()
|
||||
+ " Server response : \n" + output);
|
||||
}
|
||||
throw new YarnException(msg);
|
||||
}
|
||||
return resp;
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public ClientResponse doPostingObject(Object object, String path) {
|
||||
WebResource webResource = client.resource(resURI);
|
||||
if (path == null) {
|
||||
return webResource.accept(MediaType.APPLICATION_JSON)
|
||||
.type(MediaType.APPLICATION_JSON)
|
||||
.post(ClientResponse.class, object);
|
||||
} else if (path.equals("domain")) {
|
||||
return webResource.path(path).accept(MediaType.APPLICATION_JSON)
|
||||
.type(MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class, object);
|
||||
} else {
|
||||
throw new YarnRuntimeException("Unknown resource type");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestTimelineEntityGroupId {
|
||||
|
||||
@Test
|
||||
public void testTimelineEntityGroupId() {
|
||||
ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
|
||||
ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
|
||||
TimelineEntityGroupId group1 = TimelineEntityGroupId.newInstance(appId1, "1");
|
||||
TimelineEntityGroupId group2 = TimelineEntityGroupId.newInstance(appId1, "2");
|
||||
TimelineEntityGroupId group3 = TimelineEntityGroupId.newInstance(appId2, "1");
|
||||
TimelineEntityGroupId group4 = TimelineEntityGroupId.newInstance(appId1, "1");
|
||||
|
||||
Assert.assertTrue(group1.equals(group4));
|
||||
Assert.assertFalse(group1.equals(group2));
|
||||
Assert.assertFalse(group1.equals(group3));
|
||||
|
||||
Assert.assertTrue(group1.compareTo(group4) == 0);
|
||||
Assert.assertTrue(group1.compareTo(group2) < 0);
|
||||
Assert.assertTrue(group1.compareTo(group3) < 0);
|
||||
|
||||
Assert.assertTrue(group1.hashCode() == group4.hashCode());
|
||||
Assert.assertFalse(group1.hashCode() == group2.hashCode());
|
||||
Assert.assertFalse(group1.hashCode() == group3.hashCode());
|
||||
|
||||
Assert.assertEquals("timelineEntityGroupId_1234_1_1", group1.toString());
|
||||
Assert.assertEquals(TimelineEntityGroupId.fromString("timelineEntityGroupId_1234_1_1"), group1);
|
||||
}
|
||||
}
|
|
@ -25,8 +25,11 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -37,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
||||
|
@ -46,17 +48,20 @@ import org.junit.Assert;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.sun.jersey.api.client.Client;
|
||||
import com.sun.jersey.api.client.ClientHandlerException;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
|
||||
public class TestTimelineClient {
|
||||
|
||||
private TimelineClientImpl client;
|
||||
private TimelineWriter spyTimelineWriter;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
|
||||
client = createTimelineClient(conf);
|
||||
}
|
||||
|
||||
|
@ -69,7 +74,8 @@ public class TestTimelineClient {
|
|||
|
||||
@Test
|
||||
public void testPostEntities() throws Exception {
|
||||
mockEntityClientResponse(client, ClientResponse.Status.OK, false, false);
|
||||
mockEntityClientResponse(spyTimelineWriter, ClientResponse.Status.OK,
|
||||
false, false);
|
||||
try {
|
||||
TimelinePutResponse response = client.putEntities(generateEntity());
|
||||
Assert.assertEquals(0, response.getErrors().size());
|
||||
|
@ -80,7 +86,8 @@ public class TestTimelineClient {
|
|||
|
||||
@Test
|
||||
public void testPostEntitiesWithError() throws Exception {
|
||||
mockEntityClientResponse(client, ClientResponse.Status.OK, true, false);
|
||||
mockEntityClientResponse(spyTimelineWriter, ClientResponse.Status.OK, true,
|
||||
false);
|
||||
try {
|
||||
TimelinePutResponse response = client.putEntities(generateEntity());
|
||||
Assert.assertEquals(1, response.getErrors().size());
|
||||
|
@ -106,8 +113,8 @@ public class TestTimelineClient {
|
|||
|
||||
@Test
|
||||
public void testPostEntitiesNoResponse() throws Exception {
|
||||
mockEntityClientResponse(
|
||||
client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
|
||||
mockEntityClientResponse(spyTimelineWriter,
|
||||
ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
|
||||
try {
|
||||
client.putEntities(generateEntity());
|
||||
Assert.fail("Exception is expected");
|
||||
|
@ -119,7 +126,7 @@ public class TestTimelineClient {
|
|||
|
||||
@Test
|
||||
public void testPostEntitiesConnectionRefused() throws Exception {
|
||||
mockEntityClientResponse(client, null, false, true);
|
||||
mockEntityClientResponse(spyTimelineWriter, null, false, true);
|
||||
try {
|
||||
client.putEntities(generateEntity());
|
||||
Assert.fail("RuntimeException is expected");
|
||||
|
@ -130,7 +137,7 @@ public class TestTimelineClient {
|
|||
|
||||
@Test
|
||||
public void testPutDomain() throws Exception {
|
||||
mockDomainClientResponse(client, ClientResponse.Status.OK, false);
|
||||
mockDomainClientResponse(spyTimelineWriter, ClientResponse.Status.OK, false);
|
||||
try {
|
||||
client.putDomain(generateDomain());
|
||||
} catch (YarnException e) {
|
||||
|
@ -140,7 +147,8 @@ public class TestTimelineClient {
|
|||
|
||||
@Test
|
||||
public void testPutDomainNoResponse() throws Exception {
|
||||
mockDomainClientResponse(client, ClientResponse.Status.FORBIDDEN, false);
|
||||
mockDomainClientResponse(spyTimelineWriter,
|
||||
ClientResponse.Status.FORBIDDEN, false);
|
||||
try {
|
||||
client.putDomain(generateDomain());
|
||||
Assert.fail("Exception is expected");
|
||||
|
@ -152,7 +160,7 @@ public class TestTimelineClient {
|
|||
|
||||
@Test
|
||||
public void testPutDomainConnectionRefused() throws Exception {
|
||||
mockDomainClientResponse(client, null, true);
|
||||
mockDomainClientResponse(spyTimelineWriter, null, true);
|
||||
try {
|
||||
client.putDomain(generateDomain());
|
||||
Assert.fail("RuntimeException is expected");
|
||||
|
@ -291,15 +299,16 @@ public class TestTimelineClient {
|
|||
}
|
||||
|
||||
private static ClientResponse mockEntityClientResponse(
|
||||
TimelineClientImpl client, ClientResponse.Status status,
|
||||
TimelineWriter spyTimelineWriter, ClientResponse.Status status,
|
||||
boolean hasError, boolean hasRuntimeError) {
|
||||
ClientResponse response = mock(ClientResponse.class);
|
||||
if (hasRuntimeError) {
|
||||
doThrow(new ClientHandlerException(new ConnectException())).when(client)
|
||||
.doPostingObject(any(TimelineEntities.class), any(String.class));
|
||||
doThrow(new ClientHandlerException(new ConnectException())).when(
|
||||
spyTimelineWriter).doPostingObject(
|
||||
any(TimelineEntities.class), any(String.class));
|
||||
return response;
|
||||
}
|
||||
doReturn(response).when(client)
|
||||
doReturn(response).when(spyTimelineWriter)
|
||||
.doPostingObject(any(TimelineEntities.class), any(String.class));
|
||||
when(response.getClientResponseStatus()).thenReturn(status);
|
||||
TimelinePutResponse.TimelinePutError error =
|
||||
|
@ -316,15 +325,16 @@ public class TestTimelineClient {
|
|||
}
|
||||
|
||||
private static ClientResponse mockDomainClientResponse(
|
||||
TimelineClientImpl client, ClientResponse.Status status,
|
||||
TimelineWriter spyTimelineWriter, ClientResponse.Status status,
|
||||
boolean hasRuntimeError) {
|
||||
ClientResponse response = mock(ClientResponse.class);
|
||||
if (hasRuntimeError) {
|
||||
doThrow(new ClientHandlerException(new ConnectException())).when(client)
|
||||
.doPostingObject(any(TimelineDomain.class), any(String.class));
|
||||
doThrow(new ClientHandlerException(new ConnectException())).when(
|
||||
spyTimelineWriter).doPostingObject(any(TimelineDomain.class),
|
||||
any(String.class));
|
||||
return response;
|
||||
}
|
||||
doReturn(response).when(client)
|
||||
doReturn(response).when(spyTimelineWriter)
|
||||
.doPostingObject(any(TimelineDomain.class), any(String.class));
|
||||
when(response.getClientResponseStatus()).thenReturn(status);
|
||||
return response;
|
||||
|
@ -365,10 +375,19 @@ public class TestTimelineClient {
|
|||
return domain;
|
||||
}
|
||||
|
||||
private static TimelineClientImpl createTimelineClient(
|
||||
private TimelineClientImpl createTimelineClient(
|
||||
YarnConfiguration conf) {
|
||||
TimelineClientImpl client =
|
||||
spy((TimelineClientImpl) TimelineClient.createTimelineClient());
|
||||
TimelineClientImpl client = new TimelineClientImpl() {
|
||||
@Override
|
||||
protected TimelineWriter createTimelineWriter(Configuration conf,
|
||||
UserGroupInformation authUgi, Client client, URI resURI)
|
||||
throws IOException {
|
||||
TimelineWriter timelineWriter =
|
||||
new DirectTimelineWriter(authUgi, client, resURI);
|
||||
spyTimelineWriter = spy(timelineWriter);
|
||||
return spyTimelineWriter;
|
||||
}
|
||||
};
|
||||
client.init(conf);
|
||||
client.start();
|
||||
return client;
|
||||
|
|
|
@ -0,0 +1,225 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client.api.impl;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.reset;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.sun.jersey.api.client.Client;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
|
||||
public class TestTimelineClientForATS1_5 {
|
||||
|
||||
protected static Log LOG = LogFactory
|
||||
.getLog(TestTimelineClientForATS1_5.class);
|
||||
|
||||
private TimelineClientImpl client;
|
||||
private static FileContext localFS;
|
||||
private static File localActiveDir;
|
||||
private TimelineWriter spyTimelineWriter;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
localFS = FileContext.getLocalFSFileContext();
|
||||
localActiveDir =
|
||||
new File("target", this.getClass().getSimpleName() + "-activeDir")
|
||||
.getAbsoluteFile();
|
||||
localFS.delete(new Path(localActiveDir.getAbsolutePath()), true);
|
||||
localActiveDir.mkdir();
|
||||
LOG.info("Created activeDir in " + localActiveDir.getAbsolutePath());
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f);
|
||||
conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
|
||||
localActiveDir.getAbsolutePath());
|
||||
conf.set(
|
||||
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
|
||||
"summary_type");
|
||||
client = createTimelineClient(conf);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (client != null) {
|
||||
client.stop();
|
||||
}
|
||||
localFS.delete(new Path(localActiveDir.getAbsolutePath()), true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPostEntities() throws Exception {
|
||||
ApplicationId appId =
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
TimelineEntityGroupId groupId =
|
||||
TimelineEntityGroupId.newInstance(appId, "1");
|
||||
TimelineEntityGroupId groupId2 =
|
||||
TimelineEntityGroupId.newInstance(appId, "2");
|
||||
// Create two entities, includes an entity type and a summary type
|
||||
TimelineEntity[] entities = new TimelineEntity[2];
|
||||
entities[0] = generateEntity("entity_type");
|
||||
entities[1] = generateEntity("summary_type");
|
||||
try {
|
||||
// if attemptid is null, fall back to the original putEntities call, and
|
||||
// save the entity
|
||||
// into configured levelDB store
|
||||
client.putEntities(null, null, entities);
|
||||
verify(spyTimelineWriter, times(1)).putEntities(entities);
|
||||
reset(spyTimelineWriter);
|
||||
|
||||
// if the attemptId is specified, but groupId is given as null, it would
|
||||
// fall back to the original putEntities call if we have the entity type.
|
||||
// the entity which is summary type would be written into FS
|
||||
ApplicationAttemptId attemptId1 =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
client.putEntities(attemptId1, null, entities);
|
||||
TimelineEntity[] entityTDB = new TimelineEntity[1];
|
||||
entityTDB[0] = entities[0];
|
||||
verify(spyTimelineWriter, times(1)).putEntities(entityTDB);
|
||||
Assert.assertTrue(localFS.util().exists(
|
||||
new Path(getAppAttemptDir(attemptId1), "summarylog-"
|
||||
+ attemptId1.toString())));
|
||||
reset(spyTimelineWriter);
|
||||
|
||||
// if we specified attemptId as well as groupId, it would save the entity
|
||||
// into
|
||||
// FileSystem instead of levelDB store
|
||||
ApplicationAttemptId attemptId2 =
|
||||
ApplicationAttemptId.newInstance(appId, 2);
|
||||
client.putEntities(attemptId2, groupId, entities);
|
||||
client.putEntities(attemptId2, groupId2, entities);
|
||||
verify(spyTimelineWriter, times(0)).putEntities(
|
||||
any(TimelineEntity[].class));
|
||||
Assert.assertTrue(localFS.util().exists(
|
||||
new Path(getAppAttemptDir(attemptId2), "summarylog-"
|
||||
+ attemptId2.toString())));
|
||||
Assert.assertTrue(localFS.util().exists(
|
||||
new Path(getAppAttemptDir(attemptId2), "entitylog-"
|
||||
+ groupId.toString())));
|
||||
Assert.assertTrue(localFS.util().exists(
|
||||
new Path(getAppAttemptDir(attemptId2), "entitylog-"
|
||||
+ groupId2.toString())));
|
||||
reset(spyTimelineWriter);
|
||||
} catch (Exception e) {
|
||||
Assert.fail("Exception is not expected. " + e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutDomain() {
|
||||
ApplicationId appId =
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
ApplicationAttemptId attemptId1 =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
try {
|
||||
TimelineDomain domain = generateDomain();
|
||||
|
||||
client.putDomain(null, domain);
|
||||
verify(spyTimelineWriter, times(1)).putDomain(domain);
|
||||
reset(spyTimelineWriter);
|
||||
|
||||
client.putDomain(attemptId1, domain);
|
||||
verify(spyTimelineWriter, times(0)).putDomain(domain);
|
||||
Assert.assertTrue(localFS.util().exists(
|
||||
new Path(getAppAttemptDir(attemptId1), "domainlog-"
|
||||
+ attemptId1.toString())));
|
||||
reset(spyTimelineWriter);
|
||||
} catch (Exception e) {
|
||||
Assert.fail("Exception is not expected." + e);
|
||||
}
|
||||
}
|
||||
|
||||
private Path getAppAttemptDir(ApplicationAttemptId appAttemptId) {
|
||||
Path appDir =
|
||||
new Path(localActiveDir.getAbsolutePath(), appAttemptId
|
||||
.getApplicationId().toString());
|
||||
Path attemptDir = new Path(appDir, appAttemptId.toString());
|
||||
return attemptDir;
|
||||
}
|
||||
|
||||
private static TimelineEntity generateEntity(String type) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
entity.setEntityId("entity id");
|
||||
entity.setEntityType(type);
|
||||
entity.setStartTime(System.currentTimeMillis());
|
||||
return entity;
|
||||
}
|
||||
|
||||
private static TimelineDomain generateDomain() {
|
||||
TimelineDomain domain = new TimelineDomain();
|
||||
domain.setId("namesapce id");
|
||||
domain.setDescription("domain description");
|
||||
domain.setOwner("domain owner");
|
||||
domain.setReaders("domain_reader");
|
||||
domain.setWriters("domain_writer");
|
||||
domain.setCreatedTime(0L);
|
||||
domain.setModifiedTime(1L);
|
||||
return domain;
|
||||
}
|
||||
|
||||
private TimelineClientImpl createTimelineClient(YarnConfiguration conf) {
|
||||
TimelineClientImpl client = new TimelineClientImpl() {
|
||||
@Override
|
||||
protected TimelineWriter createTimelineWriter(Configuration conf,
|
||||
UserGroupInformation authUgi, Client client, URI resURI)
|
||||
throws IOException {
|
||||
TimelineWriter timelineWriter =
|
||||
new FileSystemTimelineWriter(conf, authUgi, client, resURI) {
|
||||
public ClientResponse doPostingObject(Object object, String path) {
|
||||
ClientResponse response = mock(ClientResponse.class);
|
||||
when(response.getClientResponseStatus()).thenReturn(
|
||||
ClientResponse.Status.OK);
|
||||
return response;
|
||||
}
|
||||
};
|
||||
spyTimelineWriter = spy(timelineWriter);
|
||||
return spyTimelineWriter;
|
||||
}
|
||||
};
|
||||
|
||||
client.init(conf);
|
||||
client.start();
|
||||
return client;
|
||||
}
|
||||
}
|
|
@ -19,15 +19,20 @@
|
|||
package org.apache.hadoop.yarn.server.timeline.webapp;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
|
||||
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
|
||||
import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
|
||||
|
@ -39,6 +44,7 @@ import org.junit.Assert;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.sun.jersey.api.client.Client;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
|
||||
public class TestTimelineWebServicesWithSSL {
|
||||
|
@ -60,6 +66,7 @@ public class TestTimelineWebServicesWithSSL {
|
|||
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
|
||||
MemoryTimelineStore.class, TimelineStore.class);
|
||||
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, "HTTPS_ONLY");
|
||||
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
|
||||
|
||||
File base = new File(BASEDIR);
|
||||
FileUtil.fullyDelete(base);
|
||||
|
@ -123,11 +130,17 @@ public class TestTimelineWebServicesWithSSL {
|
|||
private ClientResponse resp;
|
||||
|
||||
@Override
|
||||
public ClientResponse doPostingObject(Object obj, String path) {
|
||||
resp = super.doPostingObject(obj, path);
|
||||
return resp;
|
||||
protected TimelineWriter createTimelineWriter(Configuration conf,
|
||||
UserGroupInformation authUgi, Client client, URI resURI)
|
||||
throws IOException {
|
||||
return new DirectTimelineWriter(authUgi, client, resURI) {
|
||||
@Override
|
||||
public ClientResponse doPostingObject(Object obj, String path) {
|
||||
resp = super.doPostingObject(obj, path);
|
||||
return resp;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,222 @@
|
|||
|
||||
SSUUMMMMAARRYY OOFF LLEESSSS CCOOMMMMAANNDDSS
|
||||
|
||||
Commands marked with * may be preceded by a number, _N.
|
||||
Notes in parentheses indicate the behavior if _N is given.
|
||||
|
||||
h H Display this help.
|
||||
q :q Q :Q ZZ Exit.
|
||||
---------------------------------------------------------------------------
|
||||
|
||||
MMOOVVIINNGG
|
||||
|
||||
e ^E j ^N CR * Forward one line (or _N lines).
|
||||
y ^Y k ^K ^P * Backward one line (or _N lines).
|
||||
f ^F ^V SPACE * Forward one window (or _N lines).
|
||||
b ^B ESC-v * Backward one window (or _N lines).
|
||||
z * Forward one window (and set window to _N).
|
||||
w * Backward one window (and set window to _N).
|
||||
ESC-SPACE * Forward one window, but don't stop at end-of-file.
|
||||
d ^D * Forward one half-window (and set half-window to _N).
|
||||
u ^U * Backward one half-window (and set half-window to _N).
|
||||
ESC-) RightArrow * Left one half screen width (or _N positions).
|
||||
ESC-( LeftArrow * Right one half screen width (or _N positions).
|
||||
F Forward forever; like "tail -f".
|
||||
r ^R ^L Repaint screen.
|
||||
R Repaint screen, discarding buffered input.
|
||||
---------------------------------------------------
|
||||
Default "window" is the screen height.
|
||||
Default "half-window" is half of the screen height.
|
||||
---------------------------------------------------------------------------
|
||||
|
||||
SSEEAARRCCHHIINNGG
|
||||
|
||||
/_p_a_t_t_e_r_n * Search forward for (_N-th) matching line.
|
||||
?_p_a_t_t_e_r_n * Search backward for (_N-th) matching line.
|
||||
n * Repeat previous search (for _N-th occurrence).
|
||||
N * Repeat previous search in reverse direction.
|
||||
ESC-n * Repeat previous search, spanning files.
|
||||
ESC-N * Repeat previous search, reverse dir. & spanning files.
|
||||
ESC-u Undo (toggle) search highlighting.
|
||||
&_p_a_t_t_e_r_n * Display only matching lines
|
||||
---------------------------------------------------
|
||||
Search patterns may be modified by one or more of:
|
||||
^N or ! Search for NON-matching lines.
|
||||
^E or * Search multiple files (pass thru END OF FILE).
|
||||
^F or @ Start search at FIRST file (for /) or last file (for ?).
|
||||
^K Highlight matches, but don't move (KEEP position).
|
||||
^R Don't use REGULAR EXPRESSIONS.
|
||||
---------------------------------------------------------------------------
|
||||
|
||||
JJUUMMPPIINNGG
|
||||
|
||||
g < ESC-< * Go to first line in file (or line _N).
|
||||
G > ESC-> * Go to last line in file (or line _N).
|
||||
p % * Go to beginning of file (or _N percent into file).
|
||||
t * Go to the (_N-th) next tag.
|
||||
T * Go to the (_N-th) previous tag.
|
||||
{ ( [ * Find close bracket } ) ].
|
||||
} ) ] * Find open bracket { ( [.
|
||||
ESC-^F _<_c_1_> _<_c_2_> * Find close bracket _<_c_2_>.
|
||||
ESC-^B _<_c_1_> _<_c_2_> * Find open bracket _<_c_1_>
|
||||
---------------------------------------------------
|
||||
Each "find close bracket" command goes forward to the close bracket
|
||||
matching the (_N-th) open bracket in the top line.
|
||||
Each "find open bracket" command goes backward to the open bracket
|
||||
matching the (_N-th) close bracket in the bottom line.
|
||||
|
||||
m_<_l_e_t_t_e_r_> Mark the current position with <letter>.
|
||||
'_<_l_e_t_t_e_r_> Go to a previously marked position.
|
||||
'' Go to the previous position.
|
||||
^X^X Same as '.
|
||||
---------------------------------------------------
|
||||
A mark is any upper-case or lower-case letter.
|
||||
Certain marks are predefined:
|
||||
^ means beginning of the file
|
||||
$ means end of the file
|
||||
---------------------------------------------------------------------------
|
||||
|
||||
CCHHAANNGGIINNGG FFIILLEESS
|
||||
|
||||
:e [_f_i_l_e] Examine a new file.
|
||||
^X^V Same as :e.
|
||||
:n * Examine the (_N-th) next file from the command line.
|
||||
:p * Examine the (_N-th) previous file from the command line.
|
||||
:x * Examine the first (or _N-th) file from the command line.
|
||||
:d Delete the current file from the command line list.
|
||||
= ^G :f Print current file name.
|
||||
---------------------------------------------------------------------------
|
||||
|
||||
MMIISSCCEELLLLAANNEEOOUUSS CCOOMMMMAANNDDSS
|
||||
|
||||
-_<_f_l_a_g_> Toggle a command line option [see OPTIONS below].
|
||||
--_<_n_a_m_e_> Toggle a command line option, by name.
|
||||
__<_f_l_a_g_> Display the setting of a command line option.
|
||||
___<_n_a_m_e_> Display the setting of an option, by name.
|
||||
+_c_m_d Execute the less cmd each time a new file is examined.
|
||||
|
||||
!_c_o_m_m_a_n_d Execute the shell command with $SHELL.
|
||||
|XX_c_o_m_m_a_n_d Pipe file between current pos & mark XX to shell command.
|
||||
v Edit the current file with $VISUAL or $EDITOR.
|
||||
V Print version number of "less".
|
||||
---------------------------------------------------------------------------
|
||||
|
||||
OOPPTTIIOONNSS
|
||||
|
||||
Most options may be changed either on the command line,
|
||||
or from within less by using the - or -- command.
|
||||
Options may be given in one of two forms: either a single
|
||||
character preceded by a -, or a name preceeded by --.
|
||||
|
||||
-? ........ --help
|
||||
Display help (from command line).
|
||||
-a ........ --search-skip-screen
|
||||
Forward search skips current screen.
|
||||
-b [_N] .... --buffers=[_N]
|
||||
Number of buffers.
|
||||
-B ........ --auto-buffers
|
||||
Don't automatically allocate buffers for pipes.
|
||||
-c ........ --clear-screen
|
||||
Repaint by clearing rather than scrolling.
|
||||
-d ........ --dumb
|
||||
Dumb terminal.
|
||||
-D [_x_n_._n] . --color=_x_n_._n
|
||||
Set screen colors. (MS-DOS only)
|
||||
-e -E .... --quit-at-eof --QUIT-AT-EOF
|
||||
Quit at end of file.
|
||||
-f ........ --force
|
||||
Force open non-regular files.
|
||||
-F ........ --quit-if-one-screen
|
||||
Quit if entire file fits on first screen.
|
||||
-g ........ --hilite-search
|
||||
Highlight only last match for searches.
|
||||
-G ........ --HILITE-SEARCH
|
||||
Don't highlight any matches for searches.
|
||||
-h [_N] .... --max-back-scroll=[_N]
|
||||
Backward scroll limit.
|
||||
-i ........ --ignore-case
|
||||
Ignore case in searches that do not contain uppercase.
|
||||
-I ........ --IGNORE-CASE
|
||||
Ignore case in all searches.
|
||||
-j [_N] .... --jump-target=[_N]
|
||||
Screen position of target lines.
|
||||
-J ........ --status-column
|
||||
Display a status column at left edge of screen.
|
||||
-k [_f_i_l_e] . --lesskey-file=[_f_i_l_e]
|
||||
Use a lesskey file.
|
||||
-L ........ --no-lessopen
|
||||
Ignore the LESSOPEN environment variable.
|
||||
-m -M .... --long-prompt --LONG-PROMPT
|
||||
Set prompt style.
|
||||
-n -N .... --line-numbers --LINE-NUMBERS
|
||||
Don't use line numbers.
|
||||
-o [_f_i_l_e] . --log-file=[_f_i_l_e]
|
||||
Copy to log file (standard input only).
|
||||
-O [_f_i_l_e] . --LOG-FILE=[_f_i_l_e]
|
||||
Copy to log file (unconditionally overwrite).
|
||||
-p [_p_a_t_t_e_r_n] --pattern=[_p_a_t_t_e_r_n]
|
||||
Start at pattern (from command line).
|
||||
-P [_p_r_o_m_p_t] --prompt=[_p_r_o_m_p_t]
|
||||
Define new prompt.
|
||||
-q -Q .... --quiet --QUIET --silent --SILENT
|
||||
Quiet the terminal bell.
|
||||
-r -R .... --raw-control-chars --RAW-CONTROL-CHARS
|
||||
Output "raw" control characters.
|
||||
-s ........ --squeeze-blank-lines
|
||||
Squeeze multiple blank lines.
|
||||
-S ........ --chop-long-lines
|
||||
Chop long lines.
|
||||
-t [_t_a_g] .. --tag=[_t_a_g]
|
||||
Find a tag.
|
||||
-T [_t_a_g_s_f_i_l_e] --tag-file=[_t_a_g_s_f_i_l_e]
|
||||
Use an alternate tags file.
|
||||
-u -U .... --underline-special --UNDERLINE-SPECIAL
|
||||
Change handling of backspaces.
|
||||
-V ........ --version
|
||||
Display the version number of "less".
|
||||
-w ........ --hilite-unread
|
||||
Highlight first new line after forward-screen.
|
||||
-W ........ --HILITE-UNREAD
|
||||
Highlight first new line after any forward movement.
|
||||
-x [_N[,...]] --tabs=[_N[,...]]
|
||||
Set tab stops.
|
||||
-X ........ --no-init
|
||||
Don't use termcap init/deinit strings.
|
||||
--no-keypad
|
||||
Don't use termcap keypad init/deinit strings.
|
||||
-y [_N] .... --max-forw-scroll=[_N]
|
||||
Forward scroll limit.
|
||||
-z [_N] .... --window=[_N]
|
||||
Set size of window.
|
||||
-" [_c[_c]] . --quotes=[_c[_c]]
|
||||
Set shell quote characters.
|
||||
-~ ........ --tilde
|
||||
Don't display tildes after end of file.
|
||||
-# [_N] .... --shift=[_N]
|
||||
Horizontal scroll amount (0 = one half screen width)
|
||||
|
||||
---------------------------------------------------------------------------
|
||||
|
||||
LLIINNEE EEDDIITTIINNGG
|
||||
|
||||
These keys can be used to edit text being entered
|
||||
on the "command line" at the bottom of the screen.
|
||||
|
||||
RightArrow ESC-l Move cursor right one character.
|
||||
LeftArrow ESC-h Move cursor left one character.
|
||||
CNTL-RightArrow ESC-RightArrow ESC-w Move cursor right one word.
|
||||
CNTL-LeftArrow ESC-LeftArrow ESC-b Move cursor left one word.
|
||||
HOME ESC-0 Move cursor to start of line.
|
||||
END ESC-$ Move cursor to end of line.
|
||||
BACKSPACE Delete char to left of cursor.
|
||||
DELETE ESC-x Delete char under cursor.
|
||||
CNTL-BACKSPACE ESC-BACKSPACE Delete word to left of cursor.
|
||||
CNTL-DELETE ESC-DELETE ESC-X Delete word under cursor.
|
||||
CNTL-U ESC (MS-DOS only) Delete entire line.
|
||||
UpArrow ESC-k Retrieve previous command line.
|
||||
DownArrow ESC-j Retrieve next command line.
|
||||
TAB Complete filename & cycle.
|
||||
SHIFT-TAB ESC-TAB Complete filename & reverse cycle.
|
||||
CNTL-L Complete filename, list all.
|
||||
|
||||
|
Loading…
Reference in New Issue