From 4f0c7eaff3b66df6e87aa251e1316a85f38ffab5 Mon Sep 17 00:00:00 2001 From: Junping Du Date: Wed, 25 Feb 2015 02:40:55 -0800 Subject: [PATCH] YARN-3240. Implement client API to put generic entities. Contributed by Zhijie Shen (cherry picked from commit 4487da249f448d5c67b712cd0aa723e764eed77d) --- hadoop-project/pom.xml | 7 ++ .../timelineservice/TimelineEntities.java | 58 ++++++++++ .../hadoop-yarn/hadoop-yarn-common/pom.xml | 1 + .../yarn/client/api/TimelineClient.java | 60 +++++++++- .../client/api/impl/TimelineClientImpl.java | 107 +++++++++++++++--- .../TestTimelineServiceRecords.java | 7 ++ .../hadoop-yarn-server-tests/pom.xml | 12 ++ .../TestTimelineServiceClientIntegration.java | 54 +++++++++ .../pom.xml | 11 ++ .../aggregator/BaseAggregatorService.java | 7 +- .../aggregator/PerNodeAggregatorServer.java | 9 +- .../PerNodeAggregatorWebService.java | 54 +++++---- 12 files changed, 341 insertions(+), 46 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 7c65dad09f9..f97a8827245 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -305,6 +305,13 @@ ${project.version} + + org.apache.hadoop + hadoop-yarn-server-timelineservice + ${project.version} + test-jar + + org.apache.hadoop hadoop-yarn-applications-distributedshell diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java new file mode 100644 index 00000000000..39504cc5b89 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.HashSet; +import java.util.Set; + +@XmlRootElement(name = "entities") +@XmlAccessorType(XmlAccessType.NONE) +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class TimelineEntities { + + private Set entities = new HashSet<>(); + + public TimelineEntities() { + + } + + @XmlElement(name = "entities") + public Set getEntities() { + return entities; + } + + public void setEntities(Set entities) { + this.entities = entities; + } + + public void addEntities(Set entities) { + this.entities.addAll(entities); + } + + public void addEntity(TimelineEntity entity) { + entities.add(entity); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml index 15195531bb2..b060d10d991 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml @@ -215,6 +215,7 @@ src/main/resources/webapps/jobhistory/.keep src/main/resources/webapps/yarn/.keep src/main/resources/webapps/applicationhistory/.keep + src/main/resources/webapps/timeline/.keep src/main/resources/webapps/cluster/.keep src/main/resources/webapps/test/.keep src/main/resources/webapps/proxy/.keep diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index 09298b5991f..9b3e1768335 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -31,6 +31,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -54,15 +57,25 @@ public abstract class TimelineClient extends AbstractService implements * * @return a timeline client */ + protected ApplicationId contextAppId; + protected String timelineServiceAddress; + @Public public static TimelineClient createTimelineClient() { TimelineClient client = new TimelineClientImpl(); return client; } + @Public + public static TimelineClient createTimelineClient(ApplicationId appId) { + TimelineClient client = new TimelineClientImpl(appId); + return client; + } + @Private - protected TimelineClient(String name) { + protected TimelineClient(String name, ApplicationId appId) { super(name); + contextAppId = appId; } /** @@ -187,4 +200,49 @@ public abstract class TimelineClient extends AbstractService implements public abstract void cancelDelegationToken( Token timelineDT) throws IOException, YarnException; + + /** + *

+ * Send the information of a number of conceptual entities to the timeline + * aggregator. It is a blocking API. The method will not return until all the + * put entities have been persisted. + *

+ * + * @param entities + * the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * @throws IOException + * @throws YarnException + */ + @Public + public abstract void putEntities( + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException; + + /** + *

+ * Send the information of a number of conceptual entities to the timeline + * aggregator. It is an asynchronous API. The method will return once all the + * entities are received. + *

+ * + * @param entities + * the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * @throws IOException + * @throws YarnException + */ + @Public + public abstract void putEntitiesAsync( + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException; + + /** + *

+ * Update the timeline service address where the request will be sent to + *

+ * @param address + * the timeline service address + */ + public void setTimelineServiceAddress(String address) { + timelineServiceAddress = address; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 8c600416954..ce4762d3400 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -34,6 +34,9 @@ import java.security.PrivilegedExceptionAction; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLSocketFactory; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -43,8 +46,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; import org.apache.hadoop.security.ssl.SSLFactory; @@ -54,6 +57,7 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthentica import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -78,13 +82,15 @@ import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.api.client.filter.ClientFilter; import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; +import com.sun.jersey.core.util.MultivaluedMapImpl; @Private @Evolving public class TimelineClientImpl extends TimelineClient { private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class); - private static final String RESOURCE_URI_STR = "/ws/v1/timeline/"; + private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/"; + private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/"; private static final Joiner JOINER = Joiner.on(""); public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute @@ -253,7 +259,11 @@ public class TimelineClientImpl extends TimelineClient { } public TimelineClientImpl() { - super(TimelineClientImpl.class.getName()); + super(TimelineClientImpl.class.getName(), null); + } + + public TimelineClientImpl(ApplicationId applicationId) { + super(TimelineClientImpl.class.getName(), applicationId); } protected void serviceInit(Configuration conf) throws Exception { @@ -285,21 +295,19 @@ public class TimelineClientImpl extends TimelineClient { client.addFilter(retryFilter); if (YarnConfiguration.useHttps(conf)) { - resURI = URI - .create(JOINER.join("https://", conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS), - RESOURCE_URI_STR)); + timelineServiceAddress = conf.get( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS); } else { - resURI = URI.create(JOINER.join("http://", conf.get( + timelineServiceAddress = conf.get( YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS), - RESOURCE_URI_STR)); + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS); } LOG.info("Timeline service address: " + resURI); timelineServiceVersion = conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION); + LOG.info("Timeline service address: " + timelineServiceAddress); super.serviceInit(conf); } @@ -341,6 +349,39 @@ public class TimelineClientImpl extends TimelineClient { return timelineWriter.putEntities(entities); } + @Override + public void putEntities( + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException { + putEntities(false, entities); + } + + @Override + public void putEntitiesAsync( + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException { + putEntities(true, entities); + } + + private void putEntities(boolean async, + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities + entitiesContainer = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities(); + for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entities) { + entitiesContainer.addEntity(entity); + } + MultivaluedMap params = new MultivaluedMapImpl(); + if (contextAppId != null) { + params.add("appid", contextAppId.toString()); + } + if (async) { + params.add("async", Boolean.TRUE.toString()); + } + putObjects(constructResURI(getConfig(), timelineServiceAddress, true), + "entities", params, entitiesContainer); + } @Override public void putDomain(TimelineDomain domain) throws IOException, @@ -348,6 +389,36 @@ public class TimelineClientImpl extends TimelineClient { timelineWriter.putDomain(domain); } + private void putObjects( + URI base, String path, MultivaluedMap params, Object obj) + throws IOException, YarnException { + ClientResponse resp; + try { + resp = client.resource(base).path(path).queryParams(params) + .accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON) + .put(ClientResponse.class, obj); + } catch (RuntimeException re) { + // runtime exception is expected if the client cannot connect the server + String msg = + "Failed to get the response from the timeline server."; + LOG.error(msg, re); + throw new IOException(re); + } + if (resp == null || + resp.getClientResponseStatus() != ClientResponse.Status.OK) { + String msg = + "Failed to get the response from the timeline server."; + LOG.error(msg); + if (LOG.isDebugEnabled() && resp != null) { + String output = resp.getEntity(String.class); + LOG.debug("HTTP error code: " + resp.getStatus() + + " Server response:\n" + output); + } + throw new YarnException(msg); + } + } + @SuppressWarnings("unchecked") @Override public Token getDelegationToken( @@ -362,7 +433,8 @@ public class TimelineClientImpl extends TimelineClient { new DelegationTokenAuthenticatedURL(authenticator, connConfigurator); return (Token) authUrl.getDelegationToken( - resURI.toURL(), token, renewer, doAsUser); + constructResURI(getConfig(), timelineServiceAddress, false).toURL(), + token, renewer, doAsUser); } }; return (Token) operateDelegationToken(getDTAction); @@ -397,7 +469,7 @@ public class TimelineClientImpl extends TimelineClient { // the configured service address. final URI serviceURI = isTokenServiceAddrEmpty ? resURI : new URI(scheme, null, address.getHostName(), - address.getPort(), RESOURCE_URI_STR, null, null); + address.getPort(), RESOURCE_URI_STR_V1, null, null); return authUrl .renewDelegationToken(serviceURI.toURL(), token, doAsUser); } @@ -434,7 +506,7 @@ public class TimelineClientImpl extends TimelineClient { // the configured service address. final URI serviceURI = isTokenServiceAddrEmpty ? resURI : new URI(scheme, null, address.getHostName(), - address.getPort(), RESOURCE_URI_STR, null, null); + address.getPort(), RESOURCE_URI_STR_V1, null, null); authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser); return null; } @@ -528,6 +600,13 @@ public class TimelineClientImpl extends TimelineClient { connection.setReadTimeout(socketTimeout); } + private static URI constructResURI( + Configuration conf, String address, boolean v2) { + return URI.create( + JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://", + address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1)); + } + public static void main(String[] argv) throws Exception { CommandLine cliParser = new GnuParser().parse(opts, argv); if (cliParser.hasOption("put")) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java index 6bab2396472..4f8ab94e446 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java @@ -76,6 +76,13 @@ public class TestTimelineServiceRecords { entity.addIsRelatedToEntity("test type 4", "test id 4"); entity.addIsRelatedToEntity("test type 5", "test id 5"); LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entity, true)); + + TimelineEntities entities = new TimelineEntities(); + TimelineEntity entity1 = new TimelineEntity(); + entities.addEntity(entity1); + TimelineEntity entity2 = new TimelineEntity(); + entities.addEntity(entity2); + LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entities, true)); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml index 023f6b421f1..22542a090f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml @@ -86,6 +86,18 @@ test
+ + org.apache.hadoop + hadoop-yarn-server-timelineservice + + + + org.apache.hadoop + hadoop-yarn-server-timelineservice + test-jar + test + + org.apache.hadoop hadoop-minikdc diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java new file mode 100644 index 00000000000..a5159a2d587 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -0,0 +1,54 @@ +package org.apache.hadoop.yarn.server.timelineservice; + + +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.fail; + +public class TestTimelineServiceClientIntegration { + private static PerNodeAggregatorServer server; + + @BeforeClass + public static void setupClass() throws Exception { + try { + server = PerNodeAggregatorServer.launchServer(new String[0]); + server.addApplication(ApplicationId.newInstance(0, 1)); + } catch (ExitUtil.ExitException e) { + fail(); + } + } + + @AfterClass + public static void tearDownClass() throws Exception { + if (server != null) { + server.stop(); + } + } + + @Test + public void testPutEntities() throws Exception { + TimelineClient client = + TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1)); + try { + client.init(new YarnConfiguration()); + client.start(); + TimelineEntity entity = new TimelineEntity(); + entity.setType("test entity type"); + entity.setId("test entity id"); + client.putEntities(entity); + client.putEntitiesAsync(entity); + } catch(Exception e) { + fail(); + } finally { + client.stop(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml index 3154ca31514..26790f10f21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -114,6 +114,17 @@ + + maven-jar-plugin + + + + test-jar + + test-compile + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java index 994c66fcf4f..46e5574c075 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java @@ -25,8 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; /** * Service that handles writes to the timeline service and writes them to the @@ -70,16 +69,14 @@ public class BaseAggregatorService extends CompositeService { * * @param entities entities to post * @param callerUgi the caller UGI - * @return the response that contains the result of the post. */ - public TimelinePutResponse postEntities(TimelineEntities entities, + public void postEntities(TimelineEntities entities, UserGroupInformation callerUgi) { // TODO implement if (LOG.isDebugEnabled()) { LOG.debug("postEntities(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } - return null; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java index 6371e8277fb..ef30b22b652 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.aggregator; import java.nio.ByteBuffer; +import com.google.inject.Inject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -39,9 +40,7 @@ import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.ContainerContext; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; -import org.apache.hadoop.yarn.webapp.WebApp; -import org.apache.hadoop.yarn.webapp.WebApps; -import org.apache.hadoop.yarn.webapp.YarnWebParams; +import org.apache.hadoop.yarn.webapp.*; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import com.google.common.annotations.VisibleForTesting; @@ -120,6 +119,8 @@ public class PerNodeAggregatorServer extends AuxiliaryService { extends WebApp implements YarnWebParams { @Override public void setup() { + bind(YarnJacksonJaxbJsonProvider.class); + bind(GenericExceptionHandler.class); bind(PerNodeAggregatorWebService.class); // bind to the global singleton bind(AppLevelServiceManager.class). @@ -214,7 +215,7 @@ public class PerNodeAggregatorServer extends AuxiliaryService { } @VisibleForTesting - static PerNodeAggregatorServer launchServer(String[] args) { + public static PerNodeAggregatorServer launchServer(String[] args) { Thread .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(PerNodeAggregatorServer.class, args, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java index 2d96699974b..28e6a528550 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java @@ -20,12 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.aggregator; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.Consumes; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.WebApplicationException; +import javax.ws.rs.*; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -40,14 +35,17 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.NotFoundException; import com.google.inject.Inject; import com.google.inject.Singleton; +import java.net.URI; + /** * The main per-node REST end point for timeline service writes. It is * essentially a container service that routes requests to the appropriate @@ -112,11 +110,14 @@ public class PerNodeAggregatorWebService { * the request to the app level aggregator. It expects an application as a * context. */ - @POST + @PUT + @Path("/entities") @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) - public TimelinePutResponse postEntities( + public Response putEntities( @Context HttpServletRequest req, @Context HttpServletResponse res, + @QueryParam("async") String async, + @QueryParam("appid") String appId, TimelineEntities entities) { init(res); UserGroupInformation callerUgi = getUser(req); @@ -127,13 +128,20 @@ public class PerNodeAggregatorWebService { } // TODO how to express async posts and handle them + boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); + try { - AppLevelAggregatorService service = getAggregatorService(req); + appId = parseApplicationId(appId); + if (appId == null) { + return Response.status(Response.Status.BAD_REQUEST).build(); + } + AppLevelAggregatorService service = serviceManager.getService(appId); if (service == null) { LOG.error("Application not found"); throw new NotFoundException(); // different exception? } - return service.postEntities(entities, callerUgi); + service.postEntities(entities, callerUgi); + return Response.ok().build(); } catch (Exception e) { LOG.error("Error putting entities", e); throw new WebApplicationException(e, @@ -141,16 +149,18 @@ public class PerNodeAggregatorWebService { } } - private AppLevelAggregatorService - getAggregatorService(HttpServletRequest req) { - String appIdString = getApplicationId(req); - return serviceManager.getService(appIdString); - } - - private String getApplicationId(HttpServletRequest req) { - // TODO the application id from the request - // (most likely from the URI) - return null; + private String parseApplicationId(String appId) { + // Make sure the appId is not null and is valid + ApplicationId appID; + try { + if (appId != null) { + return ConverterUtils.toApplicationId(appId.trim()).toString(); + } else { + return null; + } + } catch (Exception e) { + return null; + } } private void init(HttpServletResponse response) {