YARN-1637. Implemented a client library for Java users to post timeline entities and events. Contributed by Zhijie Shen.

svn merge --ignore-ancestry -c 1566752 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1566753 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhijie Shen 2014-02-10 21:33:41 +00:00
parent b7c2dece6c
commit f181cf0a3b
5 changed files with 320 additions and 0 deletions

View File

@ -90,6 +90,9 @@ Release 2.4.0 - UNRELEASED
YARN-1635. Implemented a Leveldb based ApplicationTimelineStore. (Billie YARN-1635. Implemented a Leveldb based ApplicationTimelineStore. (Billie
Rinaldi via zjshen) Rinaldi via zjshen)
YARN-1637. Implemented a client library for Java users to post timeline
entities and events. (zjshen)
IMPROVEMENTS IMPROVEMENTS
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via

View File

@ -79,6 +79,10 @@
<groupId>org.mortbay.jetty</groupId> <groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId> <artifactId>jetty-util</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency> <dependency>

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* A client library that can be used to post some information in terms of a
* number of conceptual entities.
*
* @See ATSEntity
*/
@Public
@Unstable
public abstract class TimelineClient extends AbstractService {
@Public
public static TimelineClient createTimelineClient() {
TimelineClient client = new TimelineClientImpl();
return client;
}
@Private
protected TimelineClient(String name) {
super(name);
}
/**
* <p>
* Post the information of a number of conceptual entities of an application
* to the timeline server. It is a blocking API. The method will not return
* until it gets the response from the timeline server.
* </p>
*
* @param entities
* the collection of {@link ATSEntity}
* @return the error information if the post entities are not correctly stored
* @throws IOException
* @throws YarnException
*/
@Public
public abstract ATSPutErrors postEntities(
ATSEntity... entities) throws IOException, YarnException;
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import javax.ws.rs.core.MediaType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
@Private
@Unstable
public class TimelineClientImpl extends TimelineClient {
private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
private static final String RESOURCE_URI_STR = "/ws/v1/apptimeline/";
private static final Joiner JOINER = Joiner.on("");
private Client client;
private URI resURI;
public TimelineClientImpl() {
super(TimelineClientImpl.class.getName());
ClientConfig cc = new DefaultClientConfig();
cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
client = Client.create(cc);
}
protected void serviceInit(Configuration conf) throws Exception {
resURI = new URI(JOINER.join(HttpConfig.getSchemePrefix(),
HttpConfig.isSecure() ? conf.get(
YarnConfiguration.AHS_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_AHS_WEBAPP_HTTPS_ADDRESS) : conf.get(
YarnConfiguration.AHS_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_AHS_WEBAPP_ADDRESS), RESOURCE_URI_STR));
super.serviceInit(conf);
}
@Override
public ATSPutErrors postEntities(
ATSEntity... entities) throws IOException, YarnException {
ATSEntities entitiesContainer = new ATSEntities();
entitiesContainer.addEntities(Arrays.asList(entities));
ClientResponse resp = doPostingEntities(entitiesContainer);
if (resp.getClientResponseStatus() != ClientResponse.Status.OK) {
String msg =
"Failed to get the response from the timeline server.";
LOG.error(msg);
if (LOG.isDebugEnabled()) {
String output = resp.getEntity(String.class);
LOG.debug("HTTP error code: " + resp.getStatus()
+ " Server response : \n" + output);
}
throw new YarnException(msg);
}
return resp.getEntity(ATSPutErrors.class);
}
@Private
@VisibleForTesting
public ClientResponse doPostingEntities(ATSEntities entities) {
WebResource webResource = client.resource(resURI);
return webResource.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, entities);
}
}

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import junit.framework.Assert;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.sun.jersey.api.client.ClientResponse;
public class TestTimelineClient {
private TimelineClientImpl client;
@Before
public void setup() {
client = spy((TimelineClientImpl) TimelineClient.createTimelineClient());
client.init(new YarnConfiguration());
client.start();
}
@After
public void tearDown() {
client.stop();
}
@Test
public void testPostEntities() throws Exception {
mockClientResponse(ClientResponse.Status.OK, false);
try {
ATSPutErrors errors = client.postEntities(generateATSEntity());
Assert.assertEquals(0, errors.getErrors().size());
} catch (YarnException e) {
Assert.fail("Exception is not expected");
}
}
@Test
public void testPostEntitiesWithError() throws Exception {
mockClientResponse(ClientResponse.Status.OK, true);
try {
ATSPutErrors errors = client.postEntities(generateATSEntity());
Assert.assertEquals(1, errors.getErrors().size());
Assert.assertEquals("test entity id", errors.getErrors().get(0)
.getEntityId());
Assert.assertEquals("test entity type", errors.getErrors().get(0)
.getEntityType());
Assert.assertEquals(ATSPutErrors.ATSPutError.IO_EXCEPTION,
errors.getErrors().get(0).getErrorCode());
} catch (YarnException e) {
Assert.fail("Exception is not expected");
}
}
@Test
public void testPostEntitiesNoResponse() throws Exception {
mockClientResponse(ClientResponse.Status.INTERNAL_SERVER_ERROR, false);
try {
client.postEntities(generateATSEntity());
Assert.fail("Exception is expected");
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().contains(
"Failed to get the response from the timeline server."));
}
}
private ClientResponse mockClientResponse(ClientResponse.Status status,
boolean hasError) {
ClientResponse response = mock(ClientResponse.class);
doReturn(response).when(client)
.doPostingEntities(any(ATSEntities.class));
when(response.getClientResponseStatus()).thenReturn(status);
ATSPutErrors.ATSPutError error = new ATSPutErrors.ATSPutError();
error.setEntityId("test entity id");
error.setEntityType("test entity type");
error.setErrorCode(ATSPutErrors.ATSPutError.IO_EXCEPTION);
ATSPutErrors errors = new ATSPutErrors();
if (hasError) {
errors.addError(error);
}
when(response.getEntity(ATSPutErrors.class)).thenReturn(errors);
return response;
}
private static ATSEntity generateATSEntity() {
ATSEntity entity = new ATSEntity();
entity.setEntityId("entity id");
entity.setEntityType("entity type");
entity.setStartTime(System.currentTimeMillis());
for (int i = 0; i < 2; ++i) {
ATSEvent event = new ATSEvent();
event.setTimestamp(System.currentTimeMillis());
event.setEventType("test event type " + i);
event.addEventInfo("key1", "val1");
event.addEventInfo("key2", "val2");
entity.addEvent(event);
}
entity.addRelatedEntity("test ref type 1", "test ref id 1");
entity.addRelatedEntity("test ref type 2", "test ref id 2");
entity.addPrimaryFilter("pkey1", "pval1");
entity.addPrimaryFilter("pkey2", "pval2");
entity.addOtherInfo("okey1", "oval1");
entity.addOtherInfo("okey2", "oval2");
return entity;
}
}