YARN-1637. Implemented a client library for Java users to post timeline entities and events. Contributed by Zhijie Shen.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1566752 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bfd158f323
commit
e74e117ad3
|
@ -119,6 +119,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
YARN-1635. Implemented a Leveldb based ApplicationTimelineStore. (Billie
|
YARN-1635. Implemented a Leveldb based ApplicationTimelineStore. (Billie
|
||||||
Rinaldi via zjshen)
|
Rinaldi via zjshen)
|
||||||
|
|
||||||
|
YARN-1637. Implemented a client library for Java users to post timeline
|
||||||
|
entities and events. (zjshen)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via
|
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via
|
||||||
|
|
|
@ -79,6 +79,10 @@
|
||||||
<groupId>org.mortbay.jetty</groupId>
|
<groupId>org.mortbay.jetty</groupId>
|
||||||
<artifactId>jetty-util</artifactId>
|
<artifactId>jetty-util</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.sun.jersey</groupId>
|
||||||
|
<artifactId>jersey-client</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
|
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.client.api;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
|
||||||
|
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A client library that can be used to post some information in terms of a
|
||||||
|
* number of conceptual entities.
|
||||||
|
*
|
||||||
|
* @See ATSEntity
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public abstract class TimelineClient extends AbstractService {
|
||||||
|
|
||||||
|
@Public
|
||||||
|
public static TimelineClient createTimelineClient() {
|
||||||
|
TimelineClient client = new TimelineClientImpl();
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
protected TimelineClient(String name) {
|
||||||
|
super(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Post the information of a number of conceptual entities of an application
|
||||||
|
* to the timeline server. It is a blocking API. The method will not return
|
||||||
|
* until it gets the response from the timeline server.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param entities
|
||||||
|
* the collection of {@link ATSEntity}
|
||||||
|
* @return the error information if the post entities are not correctly stored
|
||||||
|
* @throws IOException
|
||||||
|
* @throws YarnException
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
public abstract ATSPutErrors postEntities(
|
||||||
|
ATSEntity... entities) throws IOException, YarnException;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,106 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.client.api.impl;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import javax.ws.rs.core.MediaType;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
|
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
|
||||||
|
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
|
||||||
|
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.sun.jersey.api.client.Client;
|
||||||
|
import com.sun.jersey.api.client.ClientResponse;
|
||||||
|
import com.sun.jersey.api.client.WebResource;
|
||||||
|
import com.sun.jersey.api.client.config.ClientConfig;
|
||||||
|
import com.sun.jersey.api.client.config.DefaultClientConfig;
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public class TimelineClientImpl extends TimelineClient {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
|
||||||
|
private static final String RESOURCE_URI_STR = "/ws/v1/apptimeline/";
|
||||||
|
private static final Joiner JOINER = Joiner.on("");
|
||||||
|
|
||||||
|
private Client client;
|
||||||
|
private URI resURI;
|
||||||
|
|
||||||
|
public TimelineClientImpl() {
|
||||||
|
super(TimelineClientImpl.class.getName());
|
||||||
|
ClientConfig cc = new DefaultClientConfig();
|
||||||
|
cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
|
||||||
|
client = Client.create(cc);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
resURI = new URI(JOINER.join(HttpConfig.getSchemePrefix(),
|
||||||
|
HttpConfig.isSecure() ? conf.get(
|
||||||
|
YarnConfiguration.AHS_WEBAPP_HTTPS_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_AHS_WEBAPP_HTTPS_ADDRESS) : conf.get(
|
||||||
|
YarnConfiguration.AHS_WEBAPP_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_AHS_WEBAPP_ADDRESS), RESOURCE_URI_STR));
|
||||||
|
super.serviceInit(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ATSPutErrors postEntities(
|
||||||
|
ATSEntity... entities) throws IOException, YarnException {
|
||||||
|
ATSEntities entitiesContainer = new ATSEntities();
|
||||||
|
entitiesContainer.addEntities(Arrays.asList(entities));
|
||||||
|
ClientResponse resp = doPostingEntities(entitiesContainer);
|
||||||
|
if (resp.getClientResponseStatus() != ClientResponse.Status.OK) {
|
||||||
|
String msg =
|
||||||
|
"Failed to get the response from the timeline server.";
|
||||||
|
LOG.error(msg);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
String output = resp.getEntity(String.class);
|
||||||
|
LOG.debug("HTTP error code: " + resp.getStatus()
|
||||||
|
+ " Server response : \n" + output);
|
||||||
|
}
|
||||||
|
throw new YarnException(msg);
|
||||||
|
}
|
||||||
|
return resp.getEntity(ATSPutErrors.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@VisibleForTesting
|
||||||
|
public ClientResponse doPostingEntities(ATSEntities entities) {
|
||||||
|
WebResource webResource = client.resource(resURI);
|
||||||
|
return webResource.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.type(MediaType.APPLICATION_JSON)
|
||||||
|
.post(ClientResponse.class, entities);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,137 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.client.api.impl;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
|
||||||
|
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent;
|
||||||
|
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
|
||||||
|
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.sun.jersey.api.client.ClientResponse;
|
||||||
|
|
||||||
|
public class TestTimelineClient {
|
||||||
|
|
||||||
|
private TimelineClientImpl client;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
client = spy((TimelineClientImpl) TimelineClient.createTimelineClient());
|
||||||
|
client.init(new YarnConfiguration());
|
||||||
|
client.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
client.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPostEntities() throws Exception {
|
||||||
|
mockClientResponse(ClientResponse.Status.OK, false);
|
||||||
|
try {
|
||||||
|
ATSPutErrors errors = client.postEntities(generateATSEntity());
|
||||||
|
Assert.assertEquals(0, errors.getErrors().size());
|
||||||
|
} catch (YarnException e) {
|
||||||
|
Assert.fail("Exception is not expected");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPostEntitiesWithError() throws Exception {
|
||||||
|
mockClientResponse(ClientResponse.Status.OK, true);
|
||||||
|
try {
|
||||||
|
ATSPutErrors errors = client.postEntities(generateATSEntity());
|
||||||
|
Assert.assertEquals(1, errors.getErrors().size());
|
||||||
|
Assert.assertEquals("test entity id", errors.getErrors().get(0)
|
||||||
|
.getEntityId());
|
||||||
|
Assert.assertEquals("test entity type", errors.getErrors().get(0)
|
||||||
|
.getEntityType());
|
||||||
|
Assert.assertEquals(ATSPutErrors.ATSPutError.IO_EXCEPTION,
|
||||||
|
errors.getErrors().get(0).getErrorCode());
|
||||||
|
} catch (YarnException e) {
|
||||||
|
Assert.fail("Exception is not expected");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPostEntitiesNoResponse() throws Exception {
|
||||||
|
mockClientResponse(ClientResponse.Status.INTERNAL_SERVER_ERROR, false);
|
||||||
|
try {
|
||||||
|
client.postEntities(generateATSEntity());
|
||||||
|
Assert.fail("Exception is expected");
|
||||||
|
} catch (YarnException e) {
|
||||||
|
Assert.assertTrue(e.getMessage().contains(
|
||||||
|
"Failed to get the response from the timeline server."));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ClientResponse mockClientResponse(ClientResponse.Status status,
|
||||||
|
boolean hasError) {
|
||||||
|
ClientResponse response = mock(ClientResponse.class);
|
||||||
|
doReturn(response).when(client)
|
||||||
|
.doPostingEntities(any(ATSEntities.class));
|
||||||
|
when(response.getClientResponseStatus()).thenReturn(status);
|
||||||
|
ATSPutErrors.ATSPutError error = new ATSPutErrors.ATSPutError();
|
||||||
|
error.setEntityId("test entity id");
|
||||||
|
error.setEntityType("test entity type");
|
||||||
|
error.setErrorCode(ATSPutErrors.ATSPutError.IO_EXCEPTION);
|
||||||
|
ATSPutErrors errors = new ATSPutErrors();
|
||||||
|
if (hasError) {
|
||||||
|
errors.addError(error);
|
||||||
|
}
|
||||||
|
when(response.getEntity(ATSPutErrors.class)).thenReturn(errors);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ATSEntity generateATSEntity() {
|
||||||
|
ATSEntity entity = new ATSEntity();
|
||||||
|
entity.setEntityId("entity id");
|
||||||
|
entity.setEntityType("entity type");
|
||||||
|
entity.setStartTime(System.currentTimeMillis());
|
||||||
|
for (int i = 0; i < 2; ++i) {
|
||||||
|
ATSEvent event = new ATSEvent();
|
||||||
|
event.setTimestamp(System.currentTimeMillis());
|
||||||
|
event.setEventType("test event type " + i);
|
||||||
|
event.addEventInfo("key1", "val1");
|
||||||
|
event.addEventInfo("key2", "val2");
|
||||||
|
entity.addEvent(event);
|
||||||
|
}
|
||||||
|
entity.addRelatedEntity("test ref type 1", "test ref id 1");
|
||||||
|
entity.addRelatedEntity("test ref type 2", "test ref id 2");
|
||||||
|
entity.addPrimaryFilter("pkey1", "pval1");
|
||||||
|
entity.addPrimaryFilter("pkey2", "pval2");
|
||||||
|
entity.addOtherInfo("okey1", "oval1");
|
||||||
|
entity.addOtherInfo("okey2", "oval2");
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue