+ * Send the information of a number of conceptual entities to the timeline + * aggregator. It is a blocking API. The method will not return until all the + * put entities have been persisted. + *
+ * + * @param entities + * the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * @throws IOException + * @throws YarnException + */ + @Public + public abstract void putEntities( + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException; + + /** + *+ * Send the information of a number of conceptual entities to the timeline + * aggregator. It is an asynchronous API. The method will return once all the + * entities are received. + *
+ * + * @param entities + * the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * @throws IOException + * @throws YarnException + */ + @Public + public abstract void putEntitiesAsync( + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException; + + /** + *+ * Update the timeline service address where the request will be sent to + *
+ * @param address + * the timeline service address + */ + public void setTimelineServiceAddress(String address) { + timelineServiceAddress = address; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 8c600416954..ce4762d3400 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -34,6 +34,9 @@ import java.security.PrivilegedExceptionAction; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLSocketFactory; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -43,8 +46,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; import org.apache.hadoop.security.ssl.SSLFactory; @@ -54,6 +57,7 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthentica import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -78,13 +82,15 @@ import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.api.client.filter.ClientFilter; import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; +import com.sun.jersey.core.util.MultivaluedMapImpl; @Private @Evolving public class TimelineClientImpl extends TimelineClient { private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class); - private static final String RESOURCE_URI_STR = "/ws/v1/timeline/"; + private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/"; + private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/"; private static final Joiner JOINER = Joiner.on(""); public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute @@ -253,7 +259,11 @@ public class TimelineClientImpl extends TimelineClient { } public TimelineClientImpl() { - super(TimelineClientImpl.class.getName()); + super(TimelineClientImpl.class.getName(), null); + } + + public TimelineClientImpl(ApplicationId applicationId) { + super(TimelineClientImpl.class.getName(), applicationId); } protected void serviceInit(Configuration conf) throws Exception { @@ -285,21 +295,19 @@ public class TimelineClientImpl extends TimelineClient { client.addFilter(retryFilter); if (YarnConfiguration.useHttps(conf)) { - resURI = URI - .create(JOINER.join("https://", conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS), - RESOURCE_URI_STR)); + timelineServiceAddress = conf.get( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS); } else { - resURI = URI.create(JOINER.join("http://", conf.get( + timelineServiceAddress = conf.get( YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS), - RESOURCE_URI_STR)); + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS); } LOG.info("Timeline service address: " + resURI); timelineServiceVersion = conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION); + LOG.info("Timeline service address: " + timelineServiceAddress); super.serviceInit(conf); } @@ -341,6 +349,39 @@ public class TimelineClientImpl extends TimelineClient { return timelineWriter.putEntities(entities); } + @Override + public void putEntities( + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException { + putEntities(false, entities); + } + + @Override + public void putEntitiesAsync( + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException { + putEntities(true, entities); + } + + private void putEntities(boolean async, + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities + entitiesContainer = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities(); + for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entities) { + entitiesContainer.addEntity(entity); + } + MultivaluedMap