YARN-1687. Renamed user-facing records for the timeline-service to be simply named after 'timeline' instead of 'apptimeline'. Contributed by Zhijie Shen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1570922 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-02-22 20:55:06 +00:00
parent e06226126c
commit 84425fb435
33 changed files with 764 additions and 726 deletions

View File

@ -221,6 +221,9 @@ Release 2.4.0 - UNRELEASED
YARN-1732. Changed types of related-entities and primary-filters in the
timeline-service to be sets instead of maps. (Billie Rinaldi via vinodkv)
YARN-1687. Renamed user-facing records for the timeline-service to be simply
named after 'timeline' instead of 'apptimeline'. (Zhijie Shen via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.apptimeline;
package org.apache.hadoop.yarn.api.records.timeline;
import java.util.ArrayList;
import java.util.List;
@ -30,18 +30,18 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* The class that hosts a list of application timeline entities.
* The class that hosts a list of timeline entities.
*/
@XmlRootElement(name = "entities")
@XmlAccessorType(XmlAccessType.NONE)
@Public
@Unstable
public class ATSEntities {
public class TimelineEntities {
private List<ATSEntity> entities =
new ArrayList<ATSEntity>();
private List<TimelineEntity> entities =
new ArrayList<TimelineEntity>();
public ATSEntities() {
public TimelineEntities() {
}
@ -51,7 +51,7 @@ public ATSEntities() {
* @return a list of entities
*/
@XmlElement(name = "entities")
public List<ATSEntity> getEntities() {
public List<TimelineEntity> getEntities() {
return entities;
}
@ -61,7 +61,7 @@ public List<ATSEntity> getEntities() {
* @param entity
* a single entity
*/
public void addEntity(ATSEntity entity) {
public void addEntity(TimelineEntity entity) {
entities.add(entity);
}
@ -71,7 +71,7 @@ public void addEntity(ATSEntity entity) {
* @param entities
* a list of entities
*/
public void addEntities(List<ATSEntity> entities) {
public void addEntities(List<TimelineEntity> entities) {
this.entities.addAll(entities);
}
@ -81,7 +81,7 @@ public void addEntities(List<ATSEntity> entities) {
* @param entities
* a list of entities
*/
public void setEntities(List<ATSEntity> entities) {
public void setEntities(List<TimelineEntity> entities) {
this.entities = entities;
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.apptimeline;
package org.apache.hadoop.yarn.api.records.timeline;
import java.util.ArrayList;
import java.util.HashMap;
@ -36,28 +36,28 @@
/**
* <p>
* The class that contains the the meta information of some conceptual entity of
* an application and its related events. The entity can be an application, an
* application attempt, a container or whatever the user-defined object.
* The class that contains the the meta information of some conceptual entity
* and its related events. The entity can be an application, an application
* attempt, a container or whatever the user-defined object.
* </p>
*
* <p>
* Primary filters will be used to index the entities in
* <code>ApplicationTimelineStore</code>, such that users should carefully
* choose the information they want to store as the primary filters. The
* remaining can be stored as other information.
* <code>TimelineStore</code>, such that users should carefully choose the
* information they want to store as the primary filters. The remaining can be
* stored as other information.
* </p>
*/
@XmlRootElement(name = "entity")
@XmlAccessorType(XmlAccessType.NONE)
@Public
@Unstable
public class ATSEntity implements Comparable<ATSEntity> {
public class TimelineEntity implements Comparable<TimelineEntity> {
private String entityType;
private String entityId;
private Long startTime;
private List<ATSEvent> events = new ArrayList<ATSEvent>();
private List<TimelineEvent> events = new ArrayList<TimelineEvent>();
private Map<String, Set<String>> relatedEntities =
new HashMap<String, Set<String>>();
private Map<String, Set<Object>> primaryFilters =
@ -65,7 +65,7 @@ public class ATSEntity implements Comparable<ATSEntity> {
private Map<String, Object> otherInfo =
new HashMap<String, Object>();
public ATSEntity() {
public TimelineEntity() {
}
@ -135,7 +135,7 @@ public void setStartTime(Long startTime) {
* @return a list of events related to the entity
*/
@XmlElement(name = "events")
public List<ATSEvent> getEvents() {
public List<TimelineEvent> getEvents() {
return events;
}
@ -145,7 +145,7 @@ public List<ATSEvent> getEvents() {
* @param event
* a single event related to the entity
*/
public void addEvent(ATSEvent event) {
public void addEvent(TimelineEvent event) {
events.add(event);
}
@ -155,7 +155,7 @@ public void addEvent(ATSEvent event) {
* @param events
* a list of events related to the entity
*/
public void addEvents(List<ATSEvent> events) {
public void addEvents(List<TimelineEvent> events) {
this.events.addAll(events);
}
@ -165,7 +165,7 @@ public void addEvents(List<ATSEvent> events) {
* @param events
* events a list of events related to the entity
*/
public void setEvents(List<ATSEvent> events) {
public void setEvents(List<TimelineEvent> events) {
this.events = events;
}
@ -203,8 +203,7 @@ public void addRelatedEntity(String entityType, String entityId) {
* a map of related entities
*/
public void addRelatedEntities(Map<String, Set<String>> relatedEntities) {
for (Entry<String, Set<String>> relatedEntity :
relatedEntities.entrySet()) {
for (Entry<String, Set<String>> relatedEntity : relatedEntities.entrySet()) {
Set<String> thisRelatedEntity =
this.relatedEntities.get(relatedEntity.getKey());
if (thisRelatedEntity == null) {
@ -261,8 +260,7 @@ public void addPrimaryFilter(String key, Object value) {
* a map of primary filters
*/
public void addPrimaryFilters(Map<String, Set<Object>> primaryFilters) {
for (Entry<String, Set<Object>> primaryFilter :
primaryFilters.entrySet()) {
for (Entry<String, Set<Object>> primaryFilter : primaryFilters.entrySet()) {
Set<Object> thisPrimaryFilter =
this.primaryFilters.get(primaryFilter.getKey());
if (thisPrimaryFilter == null) {
@ -356,7 +354,7 @@ public boolean equals(Object obj) {
return false;
if (getClass() != obj.getClass())
return false;
ATSEntity other = (ATSEntity) obj;
TimelineEntity other = (TimelineEntity) obj;
if (entityId == null) {
if (other.entityId != null)
return false;
@ -396,7 +394,7 @@ public boolean equals(Object obj) {
}
@Override
public int compareTo(ATSEntity other) {
public int compareTo(TimelineEntity other) {
int comparison = entityType.compareTo(other.entityType);
if (comparison == 0) {
long thisStartTime =

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.apptimeline;
package org.apache.hadoop.yarn.api.records.timeline;
import java.util.HashMap;
import java.util.Map;
@ -39,13 +39,13 @@
@XmlAccessorType(XmlAccessType.NONE)
@Public
@Unstable
public class ATSEvent implements Comparable<ATSEvent> {
public class TimelineEvent implements Comparable<TimelineEvent> {
private long timestamp;
private String eventType;
private Map<String, Object> eventInfo = new HashMap<String, Object>();
public ATSEvent() {
public TimelineEvent() {
}
/**
@ -132,7 +132,7 @@ public void setEventInfo(Map<String, Object> eventInfo) {
}
@Override
public int compareTo(ATSEvent other) {
public int compareTo(TimelineEvent other) {
if (timestamp > other.timestamp) {
return -1;
} else if (timestamp < other.timestamp) {
@ -149,14 +149,14 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass())
return false;
ATSEvent atsEvent = (ATSEvent) o;
TimelineEvent event = (TimelineEvent) o;
if (timestamp != atsEvent.timestamp)
if (timestamp != event.timestamp)
return false;
if (!eventType.equals(atsEvent.eventType))
if (!eventType.equals(event.eventType))
return false;
if (eventInfo != null ? !eventInfo.equals(atsEvent.eventInfo) :
atsEvent.eventInfo != null)
if (eventInfo != null ? !eventInfo.equals(event.eventInfo) :
event.eventInfo != null)
return false;
return true;

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.apptimeline;
package org.apache.hadoop.yarn.api.records.timeline;
import java.util.ArrayList;
import java.util.List;
@ -37,52 +37,52 @@
@XmlAccessorType(XmlAccessType.NONE)
@Public
@Unstable
public class ATSEvents {
public class TimelineEvents {
private List<ATSEventsOfOneEntity> allEvents =
new ArrayList<ATSEventsOfOneEntity>();
private List<EventsOfOneEntity> allEvents =
new ArrayList<EventsOfOneEntity>();
public ATSEvents() {
public TimelineEvents() {
}
/**
* Get a list of {@link ATSEventsOfOneEntity} instances
* Get a list of {@link EventsOfOneEntity} instances
*
* @return a list of {@link ATSEventsOfOneEntity} instances
* @return a list of {@link EventsOfOneEntity} instances
*/
@XmlElement(name = "events")
public List<ATSEventsOfOneEntity> getAllEvents() {
public List<EventsOfOneEntity> getAllEvents() {
return allEvents;
}
/**
* Add a single {@link ATSEventsOfOneEntity} instance into the existing list
* Add a single {@link EventsOfOneEntity} instance into the existing list
*
* @param eventsOfOneEntity
* a single {@link ATSEventsOfOneEntity} instance
* a single {@link EventsOfOneEntity} instance
*/
public void addEvent(ATSEventsOfOneEntity eventsOfOneEntity) {
public void addEvent(EventsOfOneEntity eventsOfOneEntity) {
allEvents.add(eventsOfOneEntity);
}
/**
* Add a list of {@link ATSEventsOfOneEntity} instances into the existing list
* Add a list of {@link EventsOfOneEntity} instances into the existing list
*
* @param allEvents
* a list of {@link ATSEventsOfOneEntity} instances
* a list of {@link EventsOfOneEntity} instances
*/
public void addEvents(List<ATSEventsOfOneEntity> allEvents) {
public void addEvents(List<EventsOfOneEntity> allEvents) {
this.allEvents.addAll(allEvents);
}
/**
* Set the list to the given list of {@link ATSEventsOfOneEntity} instances
* Set the list to the given list of {@link EventsOfOneEntity} instances
*
* @param allEvents
* a list of {@link ATSEventsOfOneEntity} instances
* a list of {@link EventsOfOneEntity} instances
*/
public void setEvents(List<ATSEventsOfOneEntity> allEvents) {
public void setEvents(List<EventsOfOneEntity> allEvents) {
this.allEvents.clear();
this.allEvents.addAll(allEvents);
}
@ -94,13 +94,13 @@ public void setEvents(List<ATSEventsOfOneEntity> allEvents) {
@XmlAccessorType(XmlAccessType.NONE)
@Public
@Unstable
public static class ATSEventsOfOneEntity {
public static class EventsOfOneEntity {
private String entityId;
private String entityType;
private List<ATSEvent> events = new ArrayList<ATSEvent>();
private List<TimelineEvent> events = new ArrayList<TimelineEvent>();
public ATSEventsOfOneEntity() {
public EventsOfOneEntity() {
}
@ -150,7 +150,7 @@ public void setEntityType(String entityType) {
* @return a list of events
*/
@XmlElement(name = "events")
public List<ATSEvent> getEvents() {
public List<TimelineEvent> getEvents() {
return events;
}
@ -160,7 +160,7 @@ public List<ATSEvent> getEvents() {
* @param event
* a single event
*/
public void addEvent(ATSEvent event) {
public void addEvent(TimelineEvent event) {
events.add(event);
}
@ -170,7 +170,7 @@ public void addEvent(ATSEvent event) {
* @param events
* a list of events
*/
public void addEvents(List<ATSEvent> events) {
public void addEvents(List<TimelineEvent> events) {
this.events.addAll(events);
}
@ -180,7 +180,7 @@ public void addEvents(List<ATSEvent> events) {
* @param events
* a list of events
*/
public void setEvents(List<ATSEvent> events) {
public void setEvents(List<TimelineEvent> events) {
this.events = events;
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.apptimeline;
package org.apache.hadoop.yarn.api.records.timeline;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -28,60 +28,60 @@
import java.util.List;
/**
* A class that holds a list of put errors. This is the response returned
* when a list of {@link ATSEntity} objects is added to the application
* timeline. If there are errors in storing individual entity objects,
* they will be indicated in the list of errors.
* A class that holds a list of put errors. This is the response returned when a
* list of {@link TimelineEntity} objects is added to the timeline. If there are errors
* in storing individual entity objects, they will be indicated in the list of
* errors.
*/
@XmlRootElement(name = "errors")
@XmlRootElement(name = "response")
@XmlAccessorType(XmlAccessType.NONE)
@Public
@Unstable
public class ATSPutErrors {
public class TimelinePutResponse {
private List<ATSPutError> errors = new ArrayList<ATSPutError>();
private List<TimelinePutError> errors = new ArrayList<TimelinePutError>();
public ATSPutErrors() {
public TimelinePutResponse() {
}
/**
* Get a list of {@link ATSPutError} instances
*
* @return a list of {@link ATSPutError} instances
* Get a list of {@link TimelinePutError} instances
*
* @return a list of {@link TimelinePutError} instances
*/
@XmlElement(name = "errors")
public List<ATSPutError> getErrors() {
public List<TimelinePutError> getErrors() {
return errors;
}
/**
* Add a single {@link ATSPutError} instance into the existing list
*
* Add a single {@link TimelinePutError} instance into the existing list
*
* @param error
* a single {@link ATSPutError} instance
* a single {@link TimelinePutError} instance
*/
public void addError(ATSPutError error) {
public void addError(TimelinePutError error) {
errors.add(error);
}
/**
* Add a list of {@link ATSPutError} instances into the existing list
*
* Add a list of {@link TimelinePutError} instances into the existing list
*
* @param errors
* a list of {@link ATSPutError} instances
* a list of {@link TimelinePutError} instances
*/
public void addErrors(List<ATSPutError> errors) {
public void addErrors(List<TimelinePutError> errors) {
this.errors.addAll(errors);
}
/**
* Set the list to the given list of {@link ATSPutError} instances
*
* Set the list to the given list of {@link TimelinePutError} instances
*
* @param errors
* a list of {@link ATSPutError} instances
* a list of {@link TimelinePutError} instances
*/
public void setErrors(List<ATSPutError> errors) {
public void setErrors(List<TimelinePutError> errors) {
this.errors.clear();
this.errors.addAll(errors);
}
@ -93,11 +93,12 @@ public void setErrors(List<ATSPutError> errors) {
@XmlAccessorType(XmlAccessType.NONE)
@Public
@Unstable
public static class ATSPutError {
public static class TimelinePutError {
/**
* Error code returned when no start time can be found when putting an
* entity. This occurs when the entity does not already exist in the
* store and it is put with no start time or events specified.
* entity. This occurs when the entity does not already exist in the store
* and it is put with no start time or events specified.
*/
public static final int NO_START_TIME = 1;
/**
@ -112,7 +113,7 @@ public static class ATSPutError {
/**
* Get the entity Id
*
*
* @return the entity Id
*/
@XmlElement(name = "entity")
@ -122,7 +123,7 @@ public String getEntityId() {
/**
* Set the entity Id
*
*
* @param entityId
* the entity Id
*/
@ -132,7 +133,7 @@ public void setEntityId(String entityId) {
/**
* Get the entity type
*
*
* @return the entity type
*/
@XmlElement(name = "entitytype")
@ -142,7 +143,7 @@ public String getEntityType() {
/**
* Set the entity type
*
*
* @param entityType
* the entity type
*/
@ -152,7 +153,7 @@ public void setEntityType(String entityType) {
/**
* Get the error code
*
*
* @return an error code
*/
@XmlElement(name = "errorcode")
@ -162,7 +163,7 @@ public int getErrorCode() {
/**
* Set the error code to the given error code
*
*
* @param errorCode
* an error code
*/

View File

@ -16,6 +16,6 @@
* limitations under the License.
*/
@InterfaceAudience.Public
package org.apache.hadoop.yarn.api.records.apptimeline;
package org.apache.hadoop.yarn.api.records.timeline;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -1051,17 +1051,19 @@ public class YarnConfiguration extends Configuration {
AHS_PREFIX + "webapp.spnego-keytab-file";
////////////////////////////////
// ATS Configs
// Timeline Service Configs
////////////////////////////////
public static final String ATS_PREFIX = YARN_PREFIX + "ats.";
public static final String TIMELINE_SERVICE_PREFIX =
YARN_PREFIX + "timeline-service.";
/** ATS store class */
public static final String ATS_STORE = ATS_PREFIX + "store.class";
/** Timeline service store class */
public static final String TIMELINE_SERVICE_STORE =
TIMELINE_SERVICE_PREFIX + "store-class";
/** ATS leveldb path */
public static final String ATS_LEVELDB_PATH_PROPERTY =
ATS_PREFIX + "leveldb-apptimeline-store.path";
/** Timeline service leveldb path */
public static final String TIMELINE_SERVICE_LEVELDB_PATH =
TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.path";
////////////////////////////////
// Other Configs

View File

@ -24,8 +24,8 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -33,7 +33,7 @@
* A client library that can be used to post some information in terms of a
* number of conceptual entities.
*
* @See ATSEntity
* @See Entity
*/
@Public
@Unstable
@ -52,19 +52,19 @@ protected TimelineClient(String name) {
/**
* <p>
* Post the information of a number of conceptual entities of an application
* to the timeline server. It is a blocking API. The method will not return
* until it gets the response from the timeline server.
* Send the information of a number of conceptual entities to the timeline
* server. It is a blocking API. The method will not return until it gets the
* response from the timeline server.
* </p>
*
* @param entities
* the collection of {@link ATSEntity}
* @return the error information if the post entities are not correctly stored
* the collection of {@link TimelineEntity}
* @return the error information if the sent entities are not correctly stored
* @throws IOException
* @throws YarnException
*/
@Public
public abstract ATSPutErrors postEntities(
ATSEntity... entities) throws IOException, YarnException;
public abstract TimelinePutResponse putEntities(
TimelineEntity... entities) throws IOException, YarnException;
}

View File

@ -29,9 +29,9 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -50,7 +50,7 @@
public class TimelineClientImpl extends TimelineClient {
private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
private static final String RESOURCE_URI_STR = "/ws/v1/apptimeline/";
private static final String RESOURCE_URI_STR = "/ws/v1/timeline/";
private static final Joiner JOINER = Joiner.on("");
private Client client;
@ -79,9 +79,9 @@ protected void serviceInit(Configuration conf) throws Exception {
}
@Override
public ATSPutErrors postEntities(
ATSEntity... entities) throws IOException, YarnException {
ATSEntities entitiesContainer = new ATSEntities();
public TimelinePutResponse putEntities(
TimelineEntity... entities) throws IOException, YarnException {
TimelineEntities entitiesContainer = new TimelineEntities();
entitiesContainer.addEntities(Arrays.asList(entities));
ClientResponse resp = doPostingEntities(entitiesContainer);
if (resp.getClientResponseStatus() != ClientResponse.Status.OK) {
@ -95,12 +95,12 @@ public ATSPutErrors postEntities(
}
throw new YarnException(msg);
}
return resp.getEntity(ATSPutErrors.class);
return resp.getEntity(TimelinePutResponse.class);
}
@Private
@VisibleForTesting
public ClientResponse doPostingEntities(ATSEntities entities) {
public ClientResponse doPostingEntities(TimelineEntities entities) {
WebResource webResource = client.resource(resURI);
return webResource.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)

View File

@ -25,10 +25,10 @@
import static org.mockito.Mockito.when;
import junit.framework.Assert;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -58,8 +58,8 @@ public void tearDown() {
public void testPostEntities() throws Exception {
mockClientResponse(ClientResponse.Status.OK, false);
try {
ATSPutErrors errors = client.postEntities(generateATSEntity());
Assert.assertEquals(0, errors.getErrors().size());
TimelinePutResponse response = client.putEntities(generateEntity());
Assert.assertEquals(0, response.getErrors().size());
} catch (YarnException e) {
Assert.fail("Exception is not expected");
}
@ -69,14 +69,14 @@ public void testPostEntities() throws Exception {
public void testPostEntitiesWithError() throws Exception {
mockClientResponse(ClientResponse.Status.OK, true);
try {
ATSPutErrors errors = client.postEntities(generateATSEntity());
Assert.assertEquals(1, errors.getErrors().size());
Assert.assertEquals("test entity id", errors.getErrors().get(0)
TimelinePutResponse response = client.putEntities(generateEntity());
Assert.assertEquals(1, response.getErrors().size());
Assert.assertEquals("test entity id", response.getErrors().get(0)
.getEntityId());
Assert.assertEquals("test entity type", errors.getErrors().get(0)
Assert.assertEquals("test entity type", response.getErrors().get(0)
.getEntityType());
Assert.assertEquals(ATSPutErrors.ATSPutError.IO_EXCEPTION,
errors.getErrors().get(0).getErrorCode());
Assert.assertEquals(TimelinePutResponse.TimelinePutError.IO_EXCEPTION,
response.getErrors().get(0).getErrorCode());
} catch (YarnException e) {
Assert.fail("Exception is not expected");
}
@ -86,7 +86,7 @@ public void testPostEntitiesWithError() throws Exception {
public void testPostEntitiesNoResponse() throws Exception {
mockClientResponse(ClientResponse.Status.INTERNAL_SERVER_ERROR, false);
try {
client.postEntities(generateATSEntity());
client.putEntities(generateEntity());
Assert.fail("Exception is expected");
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().contains(
@ -98,27 +98,28 @@ private ClientResponse mockClientResponse(ClientResponse.Status status,
boolean hasError) {
ClientResponse response = mock(ClientResponse.class);
doReturn(response).when(client)
.doPostingEntities(any(ATSEntities.class));
.doPostingEntities(any(TimelineEntities.class));
when(response.getClientResponseStatus()).thenReturn(status);
ATSPutErrors.ATSPutError error = new ATSPutErrors.ATSPutError();
TimelinePutResponse.TimelinePutError error =
new TimelinePutResponse.TimelinePutError();
error.setEntityId("test entity id");
error.setEntityType("test entity type");
error.setErrorCode(ATSPutErrors.ATSPutError.IO_EXCEPTION);
ATSPutErrors errors = new ATSPutErrors();
error.setErrorCode(TimelinePutResponse.TimelinePutError.IO_EXCEPTION);
TimelinePutResponse putResponse = new TimelinePutResponse();
if (hasError) {
errors.addError(error);
putResponse.addError(error);
}
when(response.getEntity(ATSPutErrors.class)).thenReturn(errors);
when(response.getEntity(TimelinePutResponse.class)).thenReturn(putResponse);
return response;
}
private static ATSEntity generateATSEntity() {
ATSEntity entity = new ATSEntity();
private static TimelineEntity generateEntity() {
TimelineEntity entity = new TimelineEntity();
entity.setEntityId("entity id");
entity.setEntityType("entity type");
entity.setStartTime(System.currentTimeMillis());
for (int i = 0; i < 2; ++i) {
ATSEvent event = new ATSEvent();
TimelineEvent event = new TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setEventType("test event type " + i);
event.addEventInfo("key1", "val1");

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
package org.apache.hadoop.yarn.util.timeline;
import java.io.IOException;

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Public
package org.apache.hadoop.yarn.util.timeline;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -1140,18 +1140,18 @@
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore</value>
</property>
<!-- Application Timeline Service's Configuration-->
<!-- Timeline Service's Configuration-->
<property>
<description>Store class name for application timeline store</description>
<name>yarn.ats.store.class</name>
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.LeveldbApplicationTimelineStore</value>
<description>Store class name for timeline store</description>
<name>yarn.timeline-service.store-class</name>
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore</value>
</property>
<property>
<description>Store file name for leveldb application timeline store</description>
<name>yarn.ats.leveldb-apptimeline-store.path</name>
<value>${yarn.log.dir}/ats</value>
<description>Store file name for leveldb timeline store</description>
<name>yarn.timeline-service.leveldb-timeline-store.path</name>
<value>${yarn.log.dir}/timeline</value>
</property>
<!-- Other configuration -->

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.apptimeline;
package org.apache.hadoop.yarn.api.records.timeline;
import java.util.ArrayList;
import java.util.List;
@ -25,25 +25,30 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError;
import org.apache.hadoop.yarn.util.TimelineUtils;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Test;
public class TestApplicationTimelineRecords {
public class TestTimelineRecords {
private static final Log LOG =
LogFactory.getLog(TestApplicationTimelineRecords.class);
LogFactory.getLog(TestTimelineRecords.class);
@Test
public void testATSEntities() throws Exception {
ATSEntities entities = new ATSEntities();
public void testEntities() throws Exception {
TimelineEntities entities = new TimelineEntities();
for (int j = 0; j < 2; ++j) {
ATSEntity entity = new ATSEntity();
TimelineEntity entity = new TimelineEntity();
entity.setEntityId("entity id " + j);
entity.setEntityType("entity type " + j);
entity.setStartTime(System.currentTimeMillis());
for (int i = 0; i < 2; ++i) {
ATSEvent event = new ATSEvent();
TimelineEvent event = new TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setEventType("event type " + i);
event.addEventInfo("key1", "val1");
@ -62,14 +67,14 @@ public void testATSEntities() throws Exception {
LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entities, true));
Assert.assertEquals(2, entities.getEntities().size());
ATSEntity entity1 = entities.getEntities().get(0);
TimelineEntity entity1 = entities.getEntities().get(0);
Assert.assertEquals("entity id 0", entity1.getEntityId());
Assert.assertEquals("entity type 0", entity1.getEntityType());
Assert.assertEquals(2, entity1.getRelatedEntities().size());
Assert.assertEquals(2, entity1.getEvents().size());
Assert.assertEquals(2, entity1.getPrimaryFilters().size());
Assert.assertEquals(2, entity1.getOtherInfo().size());
ATSEntity entity2 = entities.getEntities().get(1);
TimelineEntity entity2 = entities.getEntities().get(1);
Assert.assertEquals("entity id 1", entity2.getEntityId());
Assert.assertEquals("entity type 1", entity2.getEntityType());
Assert.assertEquals(2, entity2.getRelatedEntities().size());
@ -79,15 +84,15 @@ public void testATSEntities() throws Exception {
}
@Test
public void testATSEvents() throws Exception {
ATSEvents events = new ATSEvents();
public void testEvents() throws Exception {
TimelineEvents events = new TimelineEvents();
for (int j = 0; j < 2; ++j) {
ATSEvents.ATSEventsOfOneEntity partEvents =
new ATSEvents.ATSEventsOfOneEntity();
TimelineEvents.EventsOfOneEntity partEvents =
new TimelineEvents.EventsOfOneEntity();
partEvents.setEntityId("entity id " + j);
partEvents.setEntityType("entity type " + j);
for (int i = 0; i < 2; ++i) {
ATSEvent event = new ATSEvent();
TimelineEvent event = new TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setEventType("event type " + i);
event.addEventInfo("key1", "val1");
@ -100,57 +105,57 @@ public void testATSEvents() throws Exception {
LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(events, true));
Assert.assertEquals(2, events.getAllEvents().size());
ATSEvents.ATSEventsOfOneEntity partEvents1 = events.getAllEvents().get(0);
TimelineEvents.EventsOfOneEntity partEvents1 = events.getAllEvents().get(0);
Assert.assertEquals("entity id 0", partEvents1.getEntityId());
Assert.assertEquals("entity type 0", partEvents1.getEntityType());
Assert.assertEquals(2, partEvents1.getEvents().size());
ATSEvent event11 = partEvents1.getEvents().get(0);
TimelineEvent event11 = partEvents1.getEvents().get(0);
Assert.assertEquals("event type 0", event11.getEventType());
Assert.assertEquals(2, event11.getEventInfo().size());
ATSEvent event12 = partEvents1.getEvents().get(1);
TimelineEvent event12 = partEvents1.getEvents().get(1);
Assert.assertEquals("event type 1", event12.getEventType());
Assert.assertEquals(2, event12.getEventInfo().size());
ATSEvents.ATSEventsOfOneEntity partEvents2 = events.getAllEvents().get(1);
TimelineEvents.EventsOfOneEntity partEvents2 = events.getAllEvents().get(1);
Assert.assertEquals("entity id 1", partEvents2.getEntityId());
Assert.assertEquals("entity type 1", partEvents2.getEntityType());
Assert.assertEquals(2, partEvents2.getEvents().size());
ATSEvent event21 = partEvents2.getEvents().get(0);
TimelineEvent event21 = partEvents2.getEvents().get(0);
Assert.assertEquals("event type 0", event21.getEventType());
Assert.assertEquals(2, event21.getEventInfo().size());
ATSEvent event22 = partEvents2.getEvents().get(1);
TimelineEvent event22 = partEvents2.getEvents().get(1);
Assert.assertEquals("event type 1", event22.getEventType());
Assert.assertEquals(2, event22.getEventInfo().size());
}
@Test
public void testATSPutErrors() throws Exception {
ATSPutErrors atsPutErrors = new ATSPutErrors();
ATSPutError error1 = new ATSPutError();
public void testTimelinePutErrors() throws Exception {
TimelinePutResponse TimelinePutErrors = new TimelinePutResponse();
TimelinePutError error1 = new TimelinePutError();
error1.setEntityId("entity id 1");
error1.setEntityId("entity type 1");
error1.setErrorCode(ATSPutError.NO_START_TIME);
atsPutErrors.addError(error1);
List<ATSPutError> errors = new ArrayList<ATSPutError>();
errors.add(error1);
ATSPutError error2 = new ATSPutError();
error1.setErrorCode(TimelinePutError.NO_START_TIME);
TimelinePutErrors.addError(error1);
List<TimelinePutError> response = new ArrayList<TimelinePutError>();
response.add(error1);
TimelinePutError error2 = new TimelinePutError();
error2.setEntityId("entity id 2");
error2.setEntityId("entity type 2");
error2.setErrorCode(ATSPutError.IO_EXCEPTION);
errors.add(error2);
atsPutErrors.addErrors(errors);
error2.setErrorCode(TimelinePutError.IO_EXCEPTION);
response.add(error2);
TimelinePutErrors.addErrors(response);
LOG.info("Errors in JSON:");
LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(atsPutErrors, true));
LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(TimelinePutErrors, true));
Assert.assertEquals(3, atsPutErrors.getErrors().size());
ATSPutError e = atsPutErrors.getErrors().get(0);
Assert.assertEquals(3, TimelinePutErrors.getErrors().size());
TimelinePutError e = TimelinePutErrors.getErrors().get(0);
Assert.assertEquals(error1.getEntityId(), e.getEntityId());
Assert.assertEquals(error1.getEntityType(), e.getEntityType());
Assert.assertEquals(error1.getErrorCode(), e.getErrorCode());
e = atsPutErrors.getErrors().get(1);
e = TimelinePutErrors.getErrors().get(1);
Assert.assertEquals(error1.getEntityId(), e.getEntityId());
Assert.assertEquals(error1.getEntityType(), e.getEntityType());
Assert.assertEquals(error1.getErrorCode(), e.getErrorCode());
e = atsPutErrors.getErrors().get(2);
e = TimelinePutErrors.getErrors().get(2);
Assert.assertEquals(error2.getEntityId(), e.getEntityId());
Assert.assertEquals(error2.getEntityType(), e.getEntityType());
Assert.assertEquals(error2.getErrorCode(), e.getErrorCode());

View File

@ -33,8 +33,8 @@
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.LeveldbApplicationTimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
@ -54,7 +54,7 @@ public class ApplicationHistoryServer extends CompositeService {
ApplicationHistoryClientService ahsClientService;
ApplicationHistoryManager historyManager;
ApplicationTimelineStore timelineStore;
TimelineStore timelineStore;
private WebApp webApp;
public ApplicationHistoryServer() {
@ -67,7 +67,7 @@ protected void serviceInit(Configuration conf) throws Exception {
ahsClientService = createApplicationHistoryClientService(historyManager);
addService(ahsClientService);
addService((Service) historyManager);
timelineStore = createApplicationTimelineStore(conf);
timelineStore = createTimelineStore(conf);
addIfService(timelineStore);
super.serviceInit(conf);
}
@ -141,11 +141,11 @@ protected ApplicationHistoryManager createApplicationHistoryManager(
return new ApplicationHistoryManagerImpl();
}
protected ApplicationTimelineStore createApplicationTimelineStore(
protected TimelineStore createTimelineStore(
Configuration conf) {
return ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.ATS_STORE, LeveldbApplicationTimelineStore.class,
ApplicationTimelineStore.class), conf);
YarnConfiguration.TIMELINE_SERVICE_STORE, LeveldbTimelineStore.class,
TimelineStore.class), conf);
}
protected void startWebApp() {

View File

@ -1,131 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Set;
import java.util.SortedSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents;
/**
* This interface is for retrieving application timeline information.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface ApplicationTimelineReader {
/**
* Possible fields to retrieve for {@link #getEntities} and {@link
* #getEntity}.
*/
enum Field {
EVENTS,
RELATED_ENTITIES,
PRIMARY_FILTERS,
OTHER_INFO,
LAST_EVENT_ONLY
}
/**
* Default limit for {@link #getEntities} and {@link #getEntityTimelines}.
*/
final long DEFAULT_LIMIT = 100;
/**
* This method retrieves a list of entity information, {@link ATSEntity},
* sorted by the starting timestamp for the entity, descending.
*
* @param entityType The type of entities to return (required).
* @param limit A limit on the number of entities to return. If null,
* defaults to {@link #DEFAULT_LIMIT}.
* @param windowStart The earliest start timestamp to retrieve (exclusive).
* If null, defaults to retrieving all entities until the
* limit is reached.
* @param windowEnd The latest start timestamp to retrieve (inclusive).
* If null, defaults to {@link Long#MAX_VALUE}
* @param primaryFilter Retrieves only entities that have the specified
* primary filter. If null, retrieves all entities.
* This is an indexed retrieval, and no entities that
* do not match the filter are scanned.
* @param secondaryFilters Retrieves only entities that have exact matches
* for all the specified filters in their primary
* filters or other info. This is not an indexed
* retrieval, so all entities are scanned but only
* those matching the filters are returned.
* @param fieldsToRetrieve Specifies which fields of the entity object to
* retrieve (see {@link Field}). If the set of fields
* contains {@link Field#LAST_EVENT_ONLY} and not
* {@link Field#EVENTS}, the most recent event for
* each entity is retrieved. If null, retrieves all
* fields.
* @return An {@link ATSEntities} object.
* @throws IOException
*/
ATSEntities getEntities(String entityType,
Long limit, Long windowStart, Long windowEnd,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fieldsToRetrieve) throws IOException;
/**
* This method retrieves the entity information for a given entity.
*
* @param entity The entity whose information will be retrieved.
* @param entityType The type of the entity.
* @param fieldsToRetrieve Specifies which fields of the entity object to
* retrieve (see {@link Field}). If the set of
* fields contains {@link Field#LAST_EVENT_ONLY} and
* not {@link Field#EVENTS}, the most recent event
* for each entity is retrieved. If null, retrieves
* all fields.
* @return An {@link ATSEntity} object.
* @throws IOException
*/
ATSEntity getEntity(String entity, String entityType, EnumSet<Field>
fieldsToRetrieve) throws IOException;
/**
* This method retrieves the events for a list of entities all of the same
* entity type. The events for each entity are sorted in order of their
* timestamps, descending.
*
* @param entityType The type of entities to retrieve events for.
* @param entityIds The entity IDs to retrieve events for.
* @param limit A limit on the number of events to return for each entity.
* If null, defaults to {@link #DEFAULT_LIMIT} events per
* entity.
* @param windowStart If not null, retrieves only events later than the
* given time (exclusive)
* @param windowEnd If not null, retrieves only events earlier than the
* given time (inclusive)
* @param eventTypes Restricts the events returned to the given types. If
* null, events of all types will be returned.
* @return An {@link ATSEvents} object.
* @throws IOException
*/
ATSEvents getEntityTimelines(String entityType,
SortedSet<String> entityIds, Long limit, Long windowStart,
Long windowEnd, Set<String> eventTypes) throws IOException;
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -37,7 +37,7 @@
* objects. The {@link #write(Object)}, {@link #read(byte[])} and {@link
* #write(java.io.DataOutputStream, Object)}, {@link
* #read(java.io.DataInputStream)} methods are used by the
* {@link LeveldbApplicationTimelineStore} to store and retrieve arbitrary
* {@link LeveldbTimelineStore} to store and retrieve arbitrary
* JSON, while the {@link #writeReverseOrderedLong} and {@link
* #readReverseOrderedLong} methods are used to sort entities in descending
* start time order.

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
import java.io.ByteArrayOutputStream;
import java.io.File;
@ -44,13 +44,13 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents.ATSEventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
@ -58,22 +58,20 @@
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
import static org.apache.hadoop.yarn.server.applicationhistoryservice
.apptimeline.GenericObjectMapper.readReverseOrderedLong;
import static org.apache.hadoop.yarn.server.applicationhistoryservice
.apptimeline.GenericObjectMapper.writeReverseOrderedLong;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.readReverseOrderedLong;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong;
/**
* An implementation of an application timeline store backed by leveldb.
* An implementation of a timeline store backed by leveldb.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class LeveldbApplicationTimelineStore extends AbstractService
implements ApplicationTimelineStore {
public class LeveldbTimelineStore extends AbstractService
implements TimelineStore {
private static final Log LOG = LogFactory
.getLog(LeveldbApplicationTimelineStore.class);
.getLog(LeveldbTimelineStore.class);
private static final String FILENAME = "leveldb-apptimeline-store.ldb";
private static final String FILENAME = "leveldb-timeline-store.ldb";
private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes();
private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes();
@ -94,8 +92,8 @@ public class LeveldbApplicationTimelineStore extends AbstractService
private DB db;
public LeveldbApplicationTimelineStore() {
super(LeveldbApplicationTimelineStore.class.getName());
public LeveldbTimelineStore() {
super(LeveldbTimelineStore.class.getName());
}
@Override
@ -103,12 +101,12 @@ protected void serviceInit(Configuration conf) throws Exception {
Options options = new Options();
options.createIfMissing(true);
JniDBFactory factory = new JniDBFactory();
String path = conf.get(YarnConfiguration.ATS_LEVELDB_PATH_PROPERTY);
String path = conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH);
File p = new File(path);
if (!p.exists())
if (!p.mkdirs())
throw new IOException("Couldn't create directory for leveldb " +
"application timeline store " + path);
"timeline store " + path);
LOG.info("Using leveldb path " + path);
db = factory.open(new File(path, FILENAME), options);
super.serviceInit(conf);
@ -212,20 +210,20 @@ public int getOffset() {
}
@Override
public ATSEntity getEntity(String entity, String entityType,
public TimelineEntity getEntity(String entityId, String entityType,
EnumSet<Field> fields) throws IOException {
DBIterator iterator = null;
try {
byte[] revStartTime = getStartTime(entity, entityType, null, null, null);
byte[] revStartTime = getStartTime(entityId, entityType, null, null, null);
if (revStartTime == null)
return null;
byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
.add(entityType).add(revStartTime).add(entity).getBytesForLookup();
.add(entityType).add(revStartTime).add(entityId).getBytesForLookup();
iterator = db.iterator();
iterator.seek(prefix);
return getEntity(entity, entityType,
return getEntity(entityId, entityType,
readReverseOrderedLong(revStartTime, 0), fields, iterator, prefix,
prefix.length);
} finally {
@ -237,43 +235,43 @@ public ATSEntity getEntity(String entity, String entityType,
* Read entity from a db iterator. If no information is found in the
* specified fields for this entity, return null.
*/
private static ATSEntity getEntity(String entity, String entityType,
private static TimelineEntity getEntity(String entityId, String entityType,
Long startTime, EnumSet<Field> fields, DBIterator iterator,
byte[] prefix, int prefixlen) throws IOException {
if (fields == null)
fields = EnumSet.allOf(Field.class);
ATSEntity atsEntity = new ATSEntity();
TimelineEntity entity = new TimelineEntity();
boolean events = false;
boolean lastEvent = false;
if (fields.contains(Field.EVENTS)) {
events = true;
atsEntity.setEvents(new ArrayList<ATSEvent>());
entity.setEvents(new ArrayList<TimelineEvent>());
} else if (fields.contains(Field.LAST_EVENT_ONLY)) {
lastEvent = true;
atsEntity.setEvents(new ArrayList<ATSEvent>());
entity.setEvents(new ArrayList<TimelineEvent>());
}
else {
atsEntity.setEvents(null);
entity.setEvents(null);
}
boolean relatedEntities = false;
if (fields.contains(Field.RELATED_ENTITIES)) {
relatedEntities = true;
} else {
atsEntity.setRelatedEntities(null);
entity.setRelatedEntities(null);
}
boolean primaryFilters = false;
if (fields.contains(Field.PRIMARY_FILTERS)) {
primaryFilters = true;
} else {
atsEntity.setPrimaryFilters(null);
entity.setPrimaryFilters(null);
}
boolean otherInfo = false;
if (fields.contains(Field.OTHER_INFO)) {
otherInfo = true;
atsEntity.setOtherInfo(new HashMap<String, Object>());
entity.setOtherInfo(new HashMap<String, Object>());
} else {
atsEntity.setOtherInfo(null);
entity.setOtherInfo(null);
}
// iterate through the entity's entry, parsing information if it is part
@ -284,48 +282,48 @@ private static ATSEntity getEntity(String entity, String entityType,
break;
if (key[prefixlen] == PRIMARY_FILTER_COLUMN[0]) {
if (primaryFilters) {
addPrimaryFilter(atsEntity, key,
addPrimaryFilter(entity, key,
prefixlen + PRIMARY_FILTER_COLUMN.length);
}
} else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
if (otherInfo) {
atsEntity.addOtherInfo(parseRemainingKey(key,
entity.addOtherInfo(parseRemainingKey(key,
prefixlen + OTHER_INFO_COLUMN.length),
GenericObjectMapper.read(iterator.peekNext().getValue()));
}
} else if (key[prefixlen] == RELATED_COLUMN[0]) {
if (relatedEntities) {
addRelatedEntity(atsEntity, key,
addRelatedEntity(entity, key,
prefixlen + RELATED_COLUMN.length);
}
} else if (key[prefixlen] == TIME_COLUMN[0]) {
if (events || (lastEvent && atsEntity.getEvents().size() == 0)) {
ATSEvent event = getEntityEvent(null, key, prefixlen +
if (events || (lastEvent && entity.getEvents().size() == 0)) {
TimelineEvent event = getEntityEvent(null, key, prefixlen +
TIME_COLUMN.length, iterator.peekNext().getValue());
if (event != null) {
atsEntity.addEvent(event);
entity.addEvent(event);
}
}
} else {
LOG.warn(String.format("Found unexpected column for entity %s of " +
"type %s (0x%02x)", entity, entityType, key[prefixlen]));
"type %s (0x%02x)", entityId, entityType, key[prefixlen]));
}
}
atsEntity.setEntityId(entity);
atsEntity.setEntityType(entityType);
atsEntity.setStartTime(startTime);
entity.setEntityId(entityId);
entity.setEntityType(entityType);
entity.setStartTime(startTime);
return atsEntity;
return entity;
}
@Override
public ATSEvents getEntityTimelines(String entityType,
public TimelineEvents getEntityTimelines(String entityType,
SortedSet<String> entityIds, Long limit, Long windowStart,
Long windowEnd, Set<String> eventType) throws IOException {
ATSEvents atsEvents = new ATSEvents();
TimelineEvents events = new TimelineEvents();
if (entityIds == null || entityIds.isEmpty())
return atsEvents;
return events;
// create a lexicographically-ordered map from start time to entities
Map<byte[], List<EntityIdentifier>> startTimeMap = new TreeMap<byte[],
List<EntityIdentifier>>(new Comparator<byte[]>() {
@ -356,13 +354,13 @@ public int compare(byte[] o1, byte[] o2) {
// start time, end time, event types) for entities whose start times
// were found and add the entities to the return list
byte[] revStartTime = entry.getKey();
for (EntityIdentifier entity : entry.getValue()) {
ATSEventsOfOneEntity atsEntity = new ATSEventsOfOneEntity();
atsEntity.setEntityId(entity.getId());
atsEntity.setEntityType(entityType);
atsEvents.addEvent(atsEntity);
for (EntityIdentifier entityID : entry.getValue()) {
EventsOfOneEntity entity = new EventsOfOneEntity();
entity.setEntityId(entityID.getId());
entity.setEntityType(entityType);
events.addEvent(entity);
KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
.add(entityType).add(revStartTime).add(entity.getId())
.add(entityType).add(revStartTime).add(entityID.getId())
.add(TIME_COLUMN);
byte[] prefix = kb.getBytesForLookup();
if (windowEnd == null) {
@ -380,24 +378,24 @@ public int compare(byte[] o1, byte[] o2) {
limit = DEFAULT_LIMIT;
}
iterator = db.iterator();
for (iterator.seek(first); atsEntity.getEvents().size() < limit &&
for (iterator.seek(first); entity.getEvents().size() < limit &&
iterator.hasNext(); iterator.next()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
WritableComparator.compareBytes(key, 0, key.length, last, 0,
last.length) > 0))
break;
ATSEvent event = getEntityEvent(eventType, key, prefix.length,
TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
iterator.peekNext().getValue());
if (event != null)
atsEntity.addEvent(event);
entity.addEvent(event);
}
}
}
} finally {
IOUtils.cleanup(LOG, iterator);
}
return atsEvents;
return events;
}
/**
@ -412,7 +410,7 @@ private static boolean prefixMatches(byte[] prefix, int prefixlen,
}
@Override
public ATSEntities getEntities(String entityType,
public TimelineEntities getEntities(String entityType,
Long limit, Long windowStart, Long windowEnd,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fields) throws IOException {
@ -447,7 +445,7 @@ public ATSEntities getEntities(String entityType,
* @return A list of entities
* @throws IOException
*/
private ATSEntities getEntityByTime(byte[] base,
private TimelineEntities getEntityByTime(byte[] base,
String entityType, Long limit, Long starttime, Long endtime,
Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields)
throws IOException {
@ -476,36 +474,36 @@ private ATSEntities getEntityByTime(byte[] base,
limit = DEFAULT_LIMIT;
}
ATSEntities atsEntities = new ATSEntities();
TimelineEntities entities = new TimelineEntities();
iterator = db.iterator();
iterator.seek(first);
// iterate until one of the following conditions is met: limit is
// reached, there are no more keys, the key prefix no longer matches,
// or a start time has been specified and reached/exceeded
while (atsEntities.getEntities().size() < limit && iterator.hasNext()) {
while (entities.getEntities().size() < limit && iterator.hasNext()) {
byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
WritableComparator.compareBytes(key, 0, key.length, last, 0,
last.length) > 0))
break;
// read the start time and entity from the current key
// read the start time and entityId from the current key
KeyParser kp = new KeyParser(key, prefix.length);
Long startTime = kp.getNextLong();
String entity = kp.getNextString();
String entityId = kp.getNextString();
// parse the entity that owns this key, iterating over all keys for
// the entity
ATSEntity atsEntity = getEntity(entity, entityType, startTime,
TimelineEntity entity = getEntity(entityId, entityType, startTime,
fields, iterator, key, kp.getOffset());
if (atsEntity == null)
if (entity == null)
continue;
// determine if the retrieved entity matches the provided secondary
// filters, and if so add it to the list of entities to return
boolean filterPassed = true;
if (secondaryFilters != null) {
for (NameValuePair filter : secondaryFilters) {
Object v = atsEntity.getOtherInfo().get(filter.getName());
Object v = entity.getOtherInfo().get(filter.getName());
if (v == null) {
Set<Object> vs = atsEntity.getPrimaryFilters()
Set<Object> vs = entity.getPrimaryFilters()
.get(filter.getName());
if (vs != null && !vs.contains(filter.getValue())) {
filterPassed = false;
@ -518,45 +516,45 @@ private ATSEntities getEntityByTime(byte[] base,
}
}
if (filterPassed)
atsEntities.addEntity(atsEntity);
entities.addEntity(entity);
}
return atsEntities;
return entities;
} finally {
IOUtils.cleanup(LOG, iterator);
}
}
/**
* Put a single entity. If there is an error, add a PutError to the given
* Put a single entity. If there is an error, add a TimelinePutError to the given
* response.
*/
private void put(ATSEntity atsEntity, ATSPutErrors response) {
private void put(TimelineEntity entity, TimelinePutResponse response) {
WriteBatch writeBatch = null;
try {
writeBatch = db.createWriteBatch();
List<ATSEvent> events = atsEntity.getEvents();
List<TimelineEvent> events = entity.getEvents();
// look up the start time for the entity
byte[] revStartTime = getStartTime(atsEntity.getEntityId(),
atsEntity.getEntityType(), atsEntity.getStartTime(), events,
byte[] revStartTime = getStartTime(entity.getEntityId(),
entity.getEntityType(), entity.getStartTime(), events,
writeBatch);
if (revStartTime == null) {
// if no start time is found, add an error and return
ATSPutError error = new ATSPutError();
error.setEntityId(atsEntity.getEntityId());
error.setEntityType(atsEntity.getEntityType());
error.setErrorCode(ATSPutError.NO_START_TIME);
TimelinePutError error = new TimelinePutError();
error.setEntityId(entity.getEntityId());
error.setEntityType(entity.getEntityType());
error.setErrorCode(TimelinePutError.NO_START_TIME);
response.addError(error);
return;
}
Long revStartTimeLong = readReverseOrderedLong(revStartTime, 0);
Map<String, Set<Object>> primaryFilters = atsEntity.getPrimaryFilters();
Map<String, Set<Object>> primaryFilters = entity.getPrimaryFilters();
// write event entries
if (events != null && !events.isEmpty()) {
for (ATSEvent event : events) {
for (TimelineEvent event : events) {
byte[] revts = writeReverseOrderedLong(event.getTimestamp());
byte[] key = createEntityEventKey(atsEntity.getEntityId(),
atsEntity.getEntityType(), revStartTime, revts,
byte[] key = createEntityEventKey(entity.getEntityId(),
entity.getEntityType(), revStartTime, revts,
event.getEventType());
byte[] value = GenericObjectMapper.write(event.getEventInfo());
writeBatch.put(key, value);
@ -566,7 +564,7 @@ private void put(ATSEntity atsEntity, ATSPutErrors response) {
// write related entity entries
Map<String, Set<String>> relatedEntities =
atsEntity.getRelatedEntities();
entity.getRelatedEntities();
if (relatedEntities != null && !relatedEntities.isEmpty()) {
for (Entry<String, Set<String>> relatedEntityList :
relatedEntities.entrySet()) {
@ -588,7 +586,7 @@ private void put(ATSEntity atsEntity, ATSPutErrors response) {
// write reverse entry (related entity -> entity)
byte[] key = createReleatedEntityKey(relatedEntityId,
relatedEntityType, relatedEntityStartTime,
atsEntity.getEntityId(), atsEntity.getEntityType());
entity.getEntityId(), entity.getEntityType());
writeBatch.put(key, EMPTY_BYTES);
// TODO: write forward entry (entity -> related entity)?
}
@ -600,8 +598,8 @@ private void put(ATSEntity atsEntity, ATSPutErrors response) {
for (Entry<String, Set<Object>> primaryFilter :
primaryFilters.entrySet()) {
for (Object primaryFilterValue : primaryFilter.getValue()) {
byte[] key = createPrimaryFilterKey(atsEntity.getEntityId(),
atsEntity.getEntityType(), revStartTime,
byte[] key = createPrimaryFilterKey(entity.getEntityId(),
entity.getEntityType(), revStartTime,
primaryFilter.getKey(), primaryFilterValue);
writeBatch.put(key, EMPTY_BYTES);
writePrimaryFilterEntries(writeBatch, primaryFilters, key,
@ -611,11 +609,11 @@ private void put(ATSEntity atsEntity, ATSPutErrors response) {
}
// write other info entries
Map<String, Object> otherInfo = atsEntity.getOtherInfo();
Map<String, Object> otherInfo = entity.getOtherInfo();
if (otherInfo != null && !otherInfo.isEmpty()) {
for (Entry<String, Object> i : otherInfo.entrySet()) {
byte[] key = createOtherInfoKey(atsEntity.getEntityId(),
atsEntity.getEntityType(), revStartTime, i.getKey());
byte[] key = createOtherInfoKey(entity.getEntityId(),
entity.getEntityType(), revStartTime, i.getKey());
byte[] value = GenericObjectMapper.write(i.getValue());
writeBatch.put(key, value);
writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
@ -623,12 +621,12 @@ private void put(ATSEntity atsEntity, ATSPutErrors response) {
}
db.write(writeBatch);
} catch (IOException e) {
LOG.error("Error putting entity " + atsEntity.getEntityId() +
" of type " + atsEntity.getEntityType(), e);
ATSPutError error = new ATSPutError();
error.setEntityId(atsEntity.getEntityId());
error.setEntityType(atsEntity.getEntityType());
error.setErrorCode(ATSPutError.IO_EXCEPTION);
LOG.error("Error putting entity " + entity.getEntityId() +
" of type " + entity.getEntityType(), e);
TimelinePutError error = new TimelinePutError();
error.setEntityId(entity.getEntityId());
error.setEntityType(entity.getEntityType());
error.setErrorCode(TimelinePutError.IO_EXCEPTION);
response.addError(error);
} finally {
IOUtils.cleanup(LOG, writeBatch);
@ -653,10 +651,10 @@ private static void writePrimaryFilterEntries(WriteBatch writeBatch,
}
@Override
public ATSPutErrors put(ATSEntities atsEntities) {
ATSPutErrors response = new ATSPutErrors();
for (ATSEntity atsEntity : atsEntities.getEntities()) {
put(atsEntity, response);
public TimelinePutResponse put(TimelineEntities entities) {
TimelinePutResponse response = new TimelinePutResponse();
for (TimelineEntity entity : entities.getEntities()) {
put(entity, response);
}
return response;
}
@ -676,7 +674,7 @@ public ATSPutErrors put(ATSEntities atsEntities) {
* @throws IOException
*/
private byte[] getStartTime(String entityId, String entityType,
Long startTime, List<ATSEvent> events, WriteBatch writeBatch)
Long startTime, List<TimelineEvent> events, WriteBatch writeBatch)
throws IOException {
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
if (startTime == null) {
@ -696,7 +694,7 @@ private byte[] getStartTime(String entityId, String entityType,
return null;
}
Long min = Long.MAX_VALUE;
for (ATSEvent e : events)
for (TimelineEvent e : events)
if (min > e.getTimestamp())
min = e.getTimestamp();
startTime = min;
@ -772,13 +770,13 @@ private static byte[] createEntityEventKey(String entity, String entitytype,
* event type is not contained in the specified set of event types,
* returns null.
*/
private static ATSEvent getEntityEvent(Set<String> eventTypes, byte[] key,
private static TimelineEvent getEntityEvent(Set<String> eventTypes, byte[] key,
int offset, byte[] value) throws IOException {
KeyParser kp = new KeyParser(key, offset);
long ts = kp.getNextLong();
String tstype = kp.getNextString();
if (eventTypes == null || eventTypes.contains(tstype)) {
ATSEvent event = new ATSEvent();
TimelineEvent event = new TimelineEvent();
event.setTimestamp(ts);
event.setEventType(tstype);
Object o = GenericObjectMapper.read(value);
@ -812,12 +810,12 @@ private static byte[] createPrimaryFilterKey(String entity,
* Parses the primary filter from the given key at the given offset and
* adds it to the given entity.
*/
private static void addPrimaryFilter(ATSEntity atsEntity, byte[] key,
private static void addPrimaryFilter(TimelineEntity entity, byte[] key,
int offset) throws IOException {
KeyParser kp = new KeyParser(key, offset);
String name = kp.getNextString();
Object value = GenericObjectMapper.read(key, kp.getOffset());
atsEntity.addPrimaryFilter(name, value);
entity.addPrimaryFilter(name, value);
}
/**
@ -856,12 +854,12 @@ private static byte[] createReleatedEntityKey(String entity,
* Parses the related entity from the given key at the given offset and
* adds it to the given entity.
*/
private static void addRelatedEntity(ATSEntity atsEntity, byte[] key,
private static void addRelatedEntity(TimelineEntity entity, byte[] key,
int offset) throws IOException {
KeyParser kp = new KeyParser(key, offset);
String type = kp.getNextString();
String id = kp.getNextString();
atsEntity.addRelatedEntity(type, id);
entity.addRelatedEntity(type, id);
}
/**

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
import java.util.ArrayList;
import java.util.Arrays;
@ -33,16 +33,16 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents.ATSEventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
/**
* In-memory implementation of {@link ApplicationTimelineStore}. This
* In-memory implementation of {@link TimelineStore}. This
* implementation is for test purpose only. If users improperly instantiate it,
* they may encounter reading and writing history data in different memory
* store.
@ -50,18 +50,18 @@
*/
@Private
@Unstable
public class MemoryApplicationTimelineStore
extends AbstractService implements ApplicationTimelineStore {
public class MemoryTimelineStore
extends AbstractService implements TimelineStore {
private Map<EntityIdentifier, ATSEntity> entities =
new HashMap<EntityIdentifier, ATSEntity>();
private Map<EntityIdentifier, TimelineEntity> entities =
new HashMap<EntityIdentifier, TimelineEntity>();
public MemoryApplicationTimelineStore() {
super(MemoryApplicationTimelineStore.class.getName());
public MemoryTimelineStore() {
super(MemoryTimelineStore.class.getName());
}
@Override
public ATSEntities getEntities(String entityType, Long limit,
public TimelineEntities getEntities(String entityType, Long limit,
Long windowStart, Long windowEnd, NameValuePair primaryFilter,
Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields) {
if (limit == null) {
@ -76,8 +76,8 @@ public ATSEntities getEntities(String entityType, Long limit,
if (fields == null) {
fields = EnumSet.allOf(Field.class);
}
List<ATSEntity> entitiesSelected = new ArrayList<ATSEntity>();
for (ATSEntity entity : new PriorityQueue<ATSEntity>(entities.values())) {
List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
for (TimelineEntity entity : new PriorityQueue<TimelineEntity>(entities.values())) {
if (entitiesSelected.size() >= limit) {
break;
}
@ -109,23 +109,23 @@ public ATSEntities getEntities(String entityType, Long limit,
}
entitiesSelected.add(entity);
}
List<ATSEntity> entitiesToReturn = new ArrayList<ATSEntity>();
for (ATSEntity entitySelected : entitiesSelected) {
List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
for (TimelineEntity entitySelected : entitiesSelected) {
entitiesToReturn.add(maskFields(entitySelected, fields));
}
Collections.sort(entitiesToReturn);
ATSEntities entitiesWrapper = new ATSEntities();
TimelineEntities entitiesWrapper = new TimelineEntities();
entitiesWrapper.setEntities(entitiesToReturn);
return entitiesWrapper;
}
@Override
public ATSEntity getEntity(String entityId, String entityType,
public TimelineEntity getEntity(String entityId, String entityType,
EnumSet<Field> fieldsToRetrieve) {
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.allOf(Field.class);
}
ATSEntity entity = entities.get(new EntityIdentifier(entityId, entityType));
TimelineEntity entity = entities.get(new EntityIdentifier(entityId, entityType));
if (entity == null) {
return null;
} else {
@ -134,11 +134,11 @@ public ATSEntity getEntity(String entityId, String entityType,
}
@Override
public ATSEvents getEntityTimelines(String entityType,
public TimelineEvents getEntityTimelines(String entityType,
SortedSet<String> entityIds, Long limit, Long windowStart,
Long windowEnd,
Set<String> eventTypes) {
ATSEvents allEvents = new ATSEvents();
TimelineEvents allEvents = new TimelineEvents();
if (entityIds == null) {
return allEvents;
}
@ -153,14 +153,14 @@ public ATSEvents getEntityTimelines(String entityType,
}
for (String entityId : entityIds) {
EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
ATSEntity entity = entities.get(entityID);
TimelineEntity entity = entities.get(entityID);
if (entity == null) {
continue;
}
ATSEventsOfOneEntity events = new ATSEventsOfOneEntity();
EventsOfOneEntity events = new EventsOfOneEntity();
events.setEntityId(entityId);
events.setEntityType(entityType);
for (ATSEvent event : entity.getEvents()) {
for (TimelineEvent event : entity.getEvents()) {
if (events.getEvents().size() >= limit) {
break;
}
@ -181,15 +181,15 @@ public ATSEvents getEntityTimelines(String entityType,
}
@Override
public ATSPutErrors put(ATSEntities data) {
ATSPutErrors errors = new ATSPutErrors();
for (ATSEntity entity : data.getEntities()) {
public TimelinePutResponse put(TimelineEntities data) {
TimelinePutResponse response = new TimelinePutResponse();
for (TimelineEntity entity : data.getEntities()) {
EntityIdentifier entityId =
new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
// store entity info in memory
ATSEntity existingEntity = entities.get(entityId);
TimelineEntity existingEntity = entities.get(entityId);
if (existingEntity == null) {
existingEntity = new ATSEntity();
existingEntity = new TimelineEntity();
existingEntity.setEntityId(entity.getEntityId());
existingEntity.setEntityType(entity.getEntityType());
existingEntity.setStartTime(entity.getStartTime());
@ -207,11 +207,11 @@ public ATSPutErrors put(ATSEntities data) {
if (existingEntity.getStartTime() == null) {
if (existingEntity.getEvents() == null
|| existingEntity.getEvents().isEmpty()) {
ATSPutError error = new ATSPutError();
TimelinePutError error = new TimelinePutError();
error.setEntityId(entityId.getId());
error.setEntityType(entityId.getType());
error.setErrorCode(ATSPutError.NO_START_TIME);
errors.addError(error);
error.setErrorCode(TimelinePutError.NO_START_TIME);
response.addError(error);
entities.remove(entityId);
continue;
} else {
@ -244,12 +244,12 @@ public ATSPutErrors put(ATSEntities data) {
for (String idStr : partRelatedEntities.getValue()) {
EntityIdentifier relatedEntityId =
new EntityIdentifier(idStr, partRelatedEntities.getKey());
ATSEntity relatedEntity = entities.get(relatedEntityId);
TimelineEntity relatedEntity = entities.get(relatedEntityId);
if (relatedEntity != null) {
relatedEntity.addRelatedEntity(
existingEntity.getEntityType(), existingEntity.getEntityId());
} else {
relatedEntity = new ATSEntity();
relatedEntity = new TimelineEntity();
relatedEntity.setEntityId(relatedEntityId.getId());
relatedEntity.setEntityType(relatedEntityId.getType());
relatedEntity.setStartTime(existingEntity.getStartTime());
@ -260,13 +260,13 @@ public ATSPutErrors put(ATSEntities data) {
}
}
}
return errors;
return response;
}
private static ATSEntity maskFields(
ATSEntity entity, EnumSet<Field> fields) {
private static TimelineEntity maskFields(
TimelineEntity entity, EnumSet<Field> fields) {
// Conceal the fields that are not going to be exposed
ATSEntity entityToReturn = new ATSEntity();
TimelineEntity entityToReturn = new TimelineEntity();
entityToReturn.setEntityId(entity.getEntityId());
entityToReturn.setEntityType(entity.getEntityType());
entityToReturn.setStartTime(entity.getStartTime());

View File

@ -15,14 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* A class holding a name and value pair, used for specifying filters in
* {@link ApplicationTimelineReader}.
* {@link TimelineReader}.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Set;
import java.util.SortedSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
/**
* This interface is for retrieving timeline information.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface TimelineReader {
/**
* Possible fields to retrieve for {@link #getEntities} and {@link #getEntity}
* .
*/
enum Field {
EVENTS,
RELATED_ENTITIES,
PRIMARY_FILTERS,
OTHER_INFO,
LAST_EVENT_ONLY
}
/**
* Default limit for {@link #getEntities} and {@link #getEntityTimelines}.
*/
final long DEFAULT_LIMIT = 100;
/**
* This method retrieves a list of entity information, {@link TimelineEntity}, sorted
* by the starting timestamp for the entity, descending.
*
* @param entityType
* The type of entities to return (required).
* @param limit
* A limit on the number of entities to return. If null, defaults to
* {@link #DEFAULT_LIMIT}.
* @param windowStart
* The earliest start timestamp to retrieve (exclusive). If null,
* defaults to retrieving all entities until the limit is reached.
* @param windowEnd
* The latest start timestamp to retrieve (inclusive). If null,
* defaults to {@link Long#MAX_VALUE}
* @param primaryFilter
* Retrieves only entities that have the specified primary filter. If
* null, retrieves all entities. This is an indexed retrieval, and no
* entities that do not match the filter are scanned.
* @param secondaryFilters
* Retrieves only entities that have exact matches for all the
* specified filters in their primary filters or other info. This is
* not an indexed retrieval, so all entities are scanned but only
* those matching the filters are returned.
* @param fieldsToRetrieve
* Specifies which fields of the entity object to retrieve (see
* {@link Field}). If the set of fields contains
* {@link Field#LAST_EVENT_ONLY} and not {@link Field#EVENTS}, the
* most recent event for each entity is retrieved. If null, retrieves
* all fields.
* @return An {@link TimelineEntities} object.
* @throws IOException
*/
TimelineEntities getEntities(String entityType,
Long limit, Long windowStart, Long windowEnd,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fieldsToRetrieve) throws IOException;
/**
* This method retrieves the entity information for a given entity.
*
* @param entityId
* The entity whose information will be retrieved.
* @param entityType
* The type of the entity.
* @param fieldsToRetrieve
* Specifies which fields of the entity object to retrieve (see
* {@link Field}). If the set of fields contains
* {@link Field#LAST_EVENT_ONLY} and not {@link Field#EVENTS}, the
* most recent event for each entity is retrieved. If null, retrieves
* all fields.
* @return An {@link TimelineEntity} object.
* @throws IOException
*/
TimelineEntity getEntity(String entityId, String entityType, EnumSet<Field>
fieldsToRetrieve) throws IOException;
/**
* This method retrieves the events for a list of entities all of the same
* entity type. The events for each entity are sorted in order of their
* timestamps, descending.
*
* @param entityType
* The type of entities to retrieve events for.
* @param entityIds
* The entity IDs to retrieve events for.
* @param limit
* A limit on the number of events to return for each entity. If
* null, defaults to {@link #DEFAULT_LIMIT} events per entity.
* @param windowStart
* If not null, retrieves only events later than the given time
* (exclusive)
* @param windowEnd
* If not null, retrieves only events earlier than the given time
* (inclusive)
* @param eventTypes
* Restricts the events returned to the given types. If null, events
* of all types will be returned.
* @return An {@link TimelineEvents} object.
* @throws IOException
*/
TimelineEvents getEntityTimelines(String entityType,
SortedSet<String> entityIds, Long limit, Long windowStart,
Long windowEnd, Set<String> eventTypes) throws IOException;
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -24,6 +24,6 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface ApplicationTimelineStore extends
Service, ApplicationTimelineReader, ApplicationTimelineWriter {
public interface TimelineStore extends
Service, TimelineReader, TimelineWriter {
}

View File

@ -16,31 +16,31 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import java.io.IOException;
/**
* This interface is for storing application timeline information.
* This interface is for storing timeline information.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface ApplicationTimelineWriter {
public interface TimelineWriter {
/**
* Stores entity information to the application timeline store. Any errors
* occurring for individual put request objects will be reported in the
* response.
*
* @param data An {@link ATSEntities} object.
* @return An {@link ATSPutErrors} object.
* Stores entity information to the timeline store. Any errors occurring for
* individual put request objects will be reported in the response.
*
* @param data
* An {@link TimelineEntities} object.
* @return An {@link TimelinePutResponse} object.
* @throws IOException
*/
ATSPutErrors put(ATSEntities data) throws IOException;
TimelinePutResponse put(TimelineEntities data) throws IOException;
}

View File

@ -16,5 +16,5 @@
* limitations under the License.
*/
@InterfaceAudience.Private
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -21,7 +21,7 @@
import org.apache.hadoop.yarn.server.api.ApplicationContext;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
@ -30,22 +30,22 @@
public class AHSWebApp extends WebApp implements YarnWebParams {
private final ApplicationHistoryManager applicationHistoryManager;
private final ApplicationTimelineStore applicationTimelineStore;
private final TimelineStore timelineStore;
public AHSWebApp(ApplicationHistoryManager applicationHistoryManager,
ApplicationTimelineStore applicationTimelineStore) {
TimelineStore timelineStore) {
this.applicationHistoryManager = applicationHistoryManager;
this.applicationTimelineStore = applicationTimelineStore;
this.timelineStore = timelineStore;
}
@Override
public void setup() {
bind(YarnJacksonJaxbJsonProvider.class);
bind(AHSWebServices.class);
bind(ATSWebServices.class);
bind(TimelineWebServices.class);
bind(GenericExceptionHandler.class);
bind(ApplicationContext.class).toInstance(applicationHistoryManager);
bind(ApplicationTimelineStore.class).toInstance(applicationTimelineStore);
bind(TimelineStore.class).toInstance(timelineStore);
route("/", AHSController.class);
route(pajoin("/apps", APP_STATE), AHSController.class);
route(pajoin("/app", APPLICATION_ID), AHSController.class, "app");

View File

@ -50,29 +50,29 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineReader.Field;
import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.NameValuePair;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@Singleton
@Path("/ws/v1/apptimeline")
@Path("/ws/v1/timeline")
//TODO: support XML serialization/deserialization
public class ATSWebServices {
public class TimelineWebServices {
private static final Log LOG = LogFactory.getLog(ATSWebServices.class);
private static final Log LOG = LogFactory.getLog(TimelineWebServices.class);
private ApplicationTimelineStore store;
private TimelineStore store;
@Inject
public ATSWebServices(ApplicationTimelineStore store) {
public TimelineWebServices(TimelineStore store) {
this.store = store;
}
@ -104,7 +104,7 @@ public void setAbout(String about) {
}
/**
* Return the description of the application timeline web services.
* Return the description of the timeline web services.
*/
@GET
@Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
@ -112,7 +112,7 @@ public AboutInfo about(
@Context HttpServletRequest req,
@Context HttpServletResponse res) {
init(res);
return new AboutInfo("Application Timeline API");
return new AboutInfo("Timeline API");
}
/**
@ -121,7 +121,7 @@ public AboutInfo about(
@GET
@Path("/{entityType}")
@Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
public ATSEntities getEntities(
public TimelineEntities getEntities(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam("entityType") String entityType,
@ -132,7 +132,7 @@ public ATSEntities getEntities(
@QueryParam("limit") String limit,
@QueryParam("fields") String fields) {
init(res);
ATSEntities entities = null;
TimelineEntities entities = null;
try {
entities = store.getEntities(
parseStr(entityType),
@ -153,7 +153,7 @@ public ATSEntities getEntities(
Response.Status.INTERNAL_SERVER_ERROR);
}
if (entities == null) {
return new ATSEntities();
return new TimelineEntities();
}
return entities;
}
@ -164,14 +164,14 @@ public ATSEntities getEntities(
@GET
@Path("/{entityType}/{entityId}")
@Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
public ATSEntity getEntity(
public TimelineEntity getEntity(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam("entityType") String entityType,
@PathParam("entityId") String entityId,
@QueryParam("fields") String fields) {
init(res);
ATSEntity entity = null;
TimelineEntity entity = null;
try {
entity =
store.getEntity(parseStr(entityId), parseStr(entityType),
@ -196,7 +196,7 @@ public ATSEntity getEntity(
@GET
@Path("/{entityType}/events")
@Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
public ATSEvents getEvents(
public TimelineEvents getEvents(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam("entityType") String entityType,
@ -206,7 +206,7 @@ public ATSEvents getEvents(
@QueryParam("windowEnd") String windowEnd,
@QueryParam("limit") String limit) {
init(res);
ATSEvents events = null;
TimelineEvents events = null;
try {
events = store.getEntityTimelines(
parseStr(entityType),
@ -224,7 +224,7 @@ public ATSEvents getEvents(
Response.Status.INTERNAL_SERVER_ERROR);
}
if (events == null) {
return new ATSEvents();
return new TimelineEvents();
}
return events;
}
@ -235,13 +235,13 @@ public ATSEvents getEvents(
*/
@POST
@Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
public ATSPutErrors postEntities(
public TimelinePutResponse postEntities(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
ATSEntities entities) {
TimelineEntities entities) {
init(res);
if (entities == null) {
return new ATSPutErrors();
return new TimelinePutResponse();
}
try {
return store.put(entities);

View File

@ -15,11 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper;
import org.junit.Test;
import java.io.IOException;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
import java.io.File;
import java.io.IOException;
@ -25,21 +25,15 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TestLeveldbApplicationTimelineStore
extends ApplicationTimelineStoreTestUtils {
public class TestLeveldbTimelineStore
extends TimelineStoreTestUtils {
private FileContext fsContext;
private File fsPath;
@ -50,9 +44,9 @@ public void setup() throws Exception {
fsPath = new File("target", this.getClass().getSimpleName() +
"-tmpDir").getAbsoluteFile();
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
conf.set(YarnConfiguration.ATS_LEVELDB_PATH_PROPERTY,
conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
fsPath.getAbsolutePath());
store = new LeveldbApplicationTimelineStore();
store = new LeveldbTimelineStore();
store.init(conf);
store.start();
loadTestData();
@ -68,7 +62,7 @@ public void tearDown() throws Exception {
@Test
public void testGetSingleEntity() throws IOException {
super.testGetSingleEntity();
((LeveldbApplicationTimelineStore)store).clearStartTimeCache();
((LeveldbTimelineStore)store).clearStartTimeCache();
super.testGetSingleEntity();
}

View File

@ -16,21 +16,23 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.MemoryTimelineStore;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class TestMemoryApplicationTimelineStore
extends ApplicationTimelineStoreTestUtils {
public class TestMemoryTimelineStore
extends TimelineStoreTestUtils {
@Before
public void setup() throws Exception {
store = new MemoryApplicationTimelineStore();
store = new MemoryTimelineStore();
store.init(new YarnConfiguration());
store.start();
loadTestData();
@ -42,7 +44,7 @@ public void tearDown() throws Exception {
store.stop();
}
public ApplicationTimelineStore getApplicationTimelineStore() {
public TimelineStore getTimelineStore() {
return store;
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@ -35,15 +35,17 @@
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents.ATSEventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError;
import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineReader.Field;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
public class ApplicationTimelineStoreTestUtils {
public class TimelineStoreTestUtils {
protected static final Map<String, Object> EMPTY_MAP =
Collections.emptyMap();
@ -52,11 +54,11 @@ public class ApplicationTimelineStoreTestUtils {
protected static final Map<String, Set<String>> EMPTY_REL_ENTITIES =
Collections.emptyMap();
protected ApplicationTimelineStore store;
protected String entity1;
protected TimelineStore store;
protected String entityId1;
protected String entityType1;
protected String entity1b;
protected String entity2;
protected String entityId1b;
protected String entityId2;
protected String entityType2;
protected Map<String, Set<Object>> primaryFilters;
protected Map<String, Object> secondaryFilters;
@ -66,19 +68,19 @@ public class ApplicationTimelineStoreTestUtils {
protected NameValuePair userFilter;
protected Collection<NameValuePair> goodTestingFilters;
protected Collection<NameValuePair> badTestingFilters;
protected ATSEvent ev1;
protected ATSEvent ev2;
protected ATSEvent ev3;
protected ATSEvent ev4;
protected TimelineEvent ev1;
protected TimelineEvent ev2;
protected TimelineEvent ev3;
protected TimelineEvent ev4;
protected Map<String, Object> eventInfo;
protected List<ATSEvent> events1;
protected List<ATSEvent> events2;
protected List<TimelineEvent> events1;
protected List<TimelineEvent> events2;
/**
* Load test data into the given store
*/
protected void loadTestData() throws IOException {
ATSEntities atsEntities = new ATSEntities();
TimelineEntities entities = new TimelineEntities();
Map<String, Set<Object>> primaryFilters =
new HashMap<String, Set<Object>>();
Set<Object> l1 = new HashSet<Object>();
@ -94,62 +96,62 @@ protected void loadTestData() throws IOException {
otherInfo1.put("info1", "val1");
otherInfo1.putAll(secondaryFilters);
String entity1 = "id_1";
String entityId1 = "id_1";
String entityType1 = "type_1";
String entity1b = "id_2";
String entity2 = "id_2";
String entityId1b = "id_2";
String entityId2 = "id_2";
String entityType2 = "type_2";
Map<String, Set<String>> relatedEntities =
new HashMap<String, Set<String>>();
relatedEntities.put(entityType2, Collections.singleton(entity2));
relatedEntities.put(entityType2, Collections.singleton(entityId2));
ATSEvent ev3 = createEvent(789l, "launch_event", null);
ATSEvent ev4 = createEvent(-123l, "init_event", null);
List<ATSEvent> events = new ArrayList<ATSEvent>();
TimelineEvent ev3 = createEvent(789l, "launch_event", null);
TimelineEvent ev4 = createEvent(-123l, "init_event", null);
List<TimelineEvent> events = new ArrayList<TimelineEvent>();
events.add(ev3);
events.add(ev4);
atsEntities.setEntities(Collections.singletonList(createEntity(entity2,
entities.setEntities(Collections.singletonList(createEntity(entityId2,
entityType2, null, events, null, null, null)));
ATSPutErrors response = store.put(atsEntities);
TimelinePutResponse response = store.put(entities);
assertEquals(0, response.getErrors().size());
ATSEvent ev1 = createEvent(123l, "start_event", null);
atsEntities.setEntities(Collections.singletonList(createEntity(entity1,
TimelineEvent ev1 = createEvent(123l, "start_event", null);
entities.setEntities(Collections.singletonList(createEntity(entityId1,
entityType1, 123l, Collections.singletonList(ev1),
relatedEntities, primaryFilters, otherInfo1)));
response = store.put(atsEntities);
response = store.put(entities);
assertEquals(0, response.getErrors().size());
atsEntities.setEntities(Collections.singletonList(createEntity(entity1b,
entities.setEntities(Collections.singletonList(createEntity(entityId1b,
entityType1, null, Collections.singletonList(ev1), relatedEntities,
primaryFilters, otherInfo1)));
response = store.put(atsEntities);
response = store.put(entities);
assertEquals(0, response.getErrors().size());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put("event info 1", "val1");
ATSEvent ev2 = createEvent(456l, "end_event", eventInfo);
TimelineEvent ev2 = createEvent(456l, "end_event", eventInfo);
Map<String, Object> otherInfo2 = new HashMap<String, Object>();
otherInfo2.put("info2", "val2");
atsEntities.setEntities(Collections.singletonList(createEntity(entity1,
entities.setEntities(Collections.singletonList(createEntity(entityId1,
entityType1, null, Collections.singletonList(ev2), null,
primaryFilters, otherInfo2)));
response = store.put(atsEntities);
response = store.put(entities);
assertEquals(0, response.getErrors().size());
atsEntities.setEntities(Collections.singletonList(createEntity(entity1b,
entities.setEntities(Collections.singletonList(createEntity(entityId1b,
entityType1, 789l, Collections.singletonList(ev2), null,
primaryFilters, otherInfo2)));
response = store.put(atsEntities);
response = store.put(entities);
assertEquals(0, response.getErrors().size());
atsEntities.setEntities(Collections.singletonList(createEntity(
entities.setEntities(Collections.singletonList(createEntity(
"badentityid", "badentity", null, null, null, null, otherInfo1)));
response = store.put(atsEntities);
response = store.put(entities);
assertEquals(1, response.getErrors().size());
ATSPutError error = response.getErrors().get(0);
TimelinePutError error = response.getErrors().get(0);
assertEquals("badentityid", error.getEntityId());
assertEquals("badentity", error.getEntityType());
assertEquals(ATSPutError.NO_START_TIME, error.getErrorCode());
assertEquals(TimelinePutError.NO_START_TIME, error.getErrorCode());
}
/**
@ -187,10 +189,10 @@ protected void loadVerificationData() throws Exception {
otherInfo.put("info2", "val2");
otherInfo.putAll(secondaryFilters);
entity1 = "id_1";
entityId1 = "id_1";
entityType1 = "type_1";
entity1b = "id_2";
entity2 = "id_2";
entityId1b = "id_2";
entityId2 = "id_2";
entityType2 = "type_2";
ev1 = createEvent(123l, "start_event", null);
@ -198,20 +200,20 @@ protected void loadVerificationData() throws Exception {
eventInfo = new HashMap<String, Object>();
eventInfo.put("event info 1", "val1");
ev2 = createEvent(456l, "end_event", eventInfo);
events1 = new ArrayList<ATSEvent>();
events1 = new ArrayList<TimelineEvent>();
events1.add(ev2);
events1.add(ev1);
relEntityMap =
new HashMap<String, Set<String>>();
Set<String> ids = new HashSet<String>();
ids.add(entity1);
ids.add(entity1b);
ids.add(entityId1);
ids.add(entityId1b);
relEntityMap.put(entityType1, ids);
ev3 = createEvent(789l, "launch_event", null);
ev4 = createEvent(-123l, "init_event", null);
events2 = new ArrayList<ATSEvent>();
events2 = new ArrayList<TimelineEvent>();
events2.add(ev3);
events2.add(ev4);
}
@ -221,39 +223,39 @@ public void testGetSingleEntity() throws IOException {
verifyEntityInfo(null, null, null, null, null, null,
store.getEntity("id_1", "type_2", EnumSet.allOf(Field.class)));
verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, store.getEntity(entity1, entityType1,
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, store.getEntity(entityId1, entityType1,
EnumSet.allOf(Field.class)));
verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, store.getEntity(entity1b, entityType1,
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, store.getEntity(entityId1b, entityType1,
EnumSet.allOf(Field.class)));
verifyEntityInfo(entity2, entityType2, events2, relEntityMap,
EMPTY_PRIMARY_FILTERS, EMPTY_MAP, store.getEntity(entity2, entityType2,
verifyEntityInfo(entityId2, entityType2, events2, relEntityMap,
EMPTY_PRIMARY_FILTERS, EMPTY_MAP, store.getEntity(entityId2, entityType2,
EnumSet.allOf(Field.class)));
// test getting single fields
verifyEntityInfo(entity1, entityType1, events1, null, null, null,
store.getEntity(entity1, entityType1, EnumSet.of(Field.EVENTS)));
verifyEntityInfo(entityId1, entityType1, events1, null, null, null,
store.getEntity(entityId1, entityType1, EnumSet.of(Field.EVENTS)));
verifyEntityInfo(entity1, entityType1, Collections.singletonList(ev2),
null, null, null, store.getEntity(entity1, entityType1,
verifyEntityInfo(entityId1, entityType1, Collections.singletonList(ev2),
null, null, null, store.getEntity(entityId1, entityType1,
EnumSet.of(Field.LAST_EVENT_ONLY)));
verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, store.getEntity(entity1b, entityType1,
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, store.getEntity(entityId1b, entityType1,
null));
verifyEntityInfo(entity1, entityType1, null, null, primaryFilters, null,
store.getEntity(entity1, entityType1,
verifyEntityInfo(entityId1, entityType1, null, null, primaryFilters, null,
store.getEntity(entityId1, entityType1,
EnumSet.of(Field.PRIMARY_FILTERS)));
verifyEntityInfo(entity1, entityType1, null, null, null, otherInfo,
store.getEntity(entity1, entityType1, EnumSet.of(Field.OTHER_INFO)));
verifyEntityInfo(entityId1, entityType1, null, null, null, otherInfo,
store.getEntity(entityId1, entityType1, EnumSet.of(Field.OTHER_INFO)));
verifyEntityInfo(entity2, entityType2, null, relEntityMap, null, null,
store.getEntity(entity2, entityType2,
verifyEntityInfo(entityId2, entityType2, null, relEntityMap, null, null,
store.getEntity(entityId2, entityType2,
EnumSet.of(Field.RELATED_ENTITIES)));
}
@ -272,31 +274,31 @@ public void testGetEntities() throws IOException {
store.getEntities("type_3", null, null, null, userFilter,
null, null).getEntities().size());
List<ATSEntity> entities =
List<TimelineEntity> entities =
store.getEntities("type_1", null, null, null, null, null,
EnumSet.allOf(Field.class)).getEntities();
assertEquals(2, entities.size());
verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(0));
verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(1));
entities = store.getEntities("type_2", null, null, null, null, null,
EnumSet.allOf(Field.class)).getEntities();
assertEquals(1, entities.size());
verifyEntityInfo(entity2, entityType2, events2, relEntityMap,
verifyEntityInfo(entityId2, entityType2, events2, relEntityMap,
EMPTY_PRIMARY_FILTERS, EMPTY_MAP, entities.get(0));
entities = store.getEntities("type_1", 1l, null, null, null, null,
EnumSet.allOf(Field.class)).getEntities();
assertEquals(1, entities.size());
verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(0));
entities = store.getEntities("type_1", 1l, 0l, null, null, null,
EnumSet.allOf(Field.class)).getEntities();
assertEquals(1, entities.size());
verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(0));
entities = store.getEntities("type_1", null, 234l, null, null, null,
@ -314,17 +316,17 @@ public void testGetEntities() throws IOException {
entities = store.getEntities("type_1", null, null, 345l, null, null,
EnumSet.allOf(Field.class)).getEntities();
assertEquals(2, entities.size());
verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(0));
verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(1));
entities = store.getEntities("type_1", null, null, 123l, null, null,
EnumSet.allOf(Field.class)).getEntities();
assertEquals(2, entities.size());
verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(0));
verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(1));
}
@ -343,12 +345,12 @@ public void testGetEntitiesWithPrimaryFilters() throws IOException {
new NameValuePair("none", "none"), null,
EnumSet.allOf(Field.class)).getEntities().size());
List<ATSEntity> entities = store.getEntities("type_1", null, null, null,
List<TimelineEntity> entities = store.getEntities("type_1", null, null, null,
userFilter, null, EnumSet.allOf(Field.class)).getEntities();
assertEquals(2, entities.size());
verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(0));
verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(1));
entities = store.getEntities("type_2", null, null, null, userFilter, null,
@ -358,13 +360,13 @@ public void testGetEntitiesWithPrimaryFilters() throws IOException {
entities = store.getEntities("type_1", 1l, null, null, userFilter, null,
EnumSet.allOf(Field.class)).getEntities();
assertEquals(1, entities.size());
verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(0));
entities = store.getEntities("type_1", 1l, 0l, null, userFilter, null,
EnumSet.allOf(Field.class)).getEntities();
assertEquals(1, entities.size());
verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(0));
entities = store.getEntities("type_1", null, 234l, null, userFilter, null,
@ -378,28 +380,28 @@ public void testGetEntitiesWithPrimaryFilters() throws IOException {
entities = store.getEntities("type_1", null, null, 345l, userFilter, null,
EnumSet.allOf(Field.class)).getEntities();
assertEquals(2, entities.size());
verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(0));
verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(1));
}
public void testGetEntitiesWithSecondaryFilters() throws IOException {
// test using secondary filter
List<ATSEntity> entities = store.getEntities("type_1", null, null, null,
List<TimelineEntity> entities = store.getEntities("type_1", null, null, null,
null, goodTestingFilters, EnumSet.allOf(Field.class)).getEntities();
assertEquals(2, entities.size());
verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(0));
verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(1));
entities = store.getEntities("type_1", null, null, null, userFilter,
goodTestingFilters, EnumSet.allOf(Field.class)).getEntities();
assertEquals(2, entities.size());
verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(0));
verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
primaryFilters, otherInfo, entities.get(1));
entities = store.getEntities("type_1", null, null, null, null,
@ -414,75 +416,75 @@ public void testGetEntitiesWithSecondaryFilters() throws IOException {
public void testGetEvents() throws IOException {
// test getting entity timelines
SortedSet<String> sortedSet = new TreeSet<String>();
sortedSet.add(entity1);
List<ATSEventsOfOneEntity> timelines =
sortedSet.add(entityId1);
List<EventsOfOneEntity> timelines =
store.getEntityTimelines(entityType1, sortedSet, null, null,
null, null).getAllEvents();
assertEquals(1, timelines.size());
verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2, ev1);
verifyEntityTimeline(timelines.get(0), entityId1, entityType1, ev2, ev1);
sortedSet.add(entity1b);
sortedSet.add(entityId1b);
timelines = store.getEntityTimelines(entityType1, sortedSet, null,
null, null, null).getAllEvents();
assertEquals(2, timelines.size());
verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2, ev1);
verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev2, ev1);
verifyEntityTimeline(timelines.get(0), entityId1, entityType1, ev2, ev1);
verifyEntityTimeline(timelines.get(1), entityId1b, entityType1, ev2, ev1);
timelines = store.getEntityTimelines(entityType1, sortedSet, 1l,
null, null, null).getAllEvents();
assertEquals(2, timelines.size());
verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2);
verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev2);
verifyEntityTimeline(timelines.get(0), entityId1, entityType1, ev2);
verifyEntityTimeline(timelines.get(1), entityId1b, entityType1, ev2);
timelines = store.getEntityTimelines(entityType1, sortedSet, null,
345l, null, null).getAllEvents();
assertEquals(2, timelines.size());
verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2);
verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev2);
verifyEntityTimeline(timelines.get(0), entityId1, entityType1, ev2);
verifyEntityTimeline(timelines.get(1), entityId1b, entityType1, ev2);
timelines = store.getEntityTimelines(entityType1, sortedSet, null,
123l, null, null).getAllEvents();
assertEquals(2, timelines.size());
verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2);
verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev2);
verifyEntityTimeline(timelines.get(0), entityId1, entityType1, ev2);
verifyEntityTimeline(timelines.get(1), entityId1b, entityType1, ev2);
timelines = store.getEntityTimelines(entityType1, sortedSet, null,
null, 345l, null).getAllEvents();
assertEquals(2, timelines.size());
verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev1);
verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev1);
verifyEntityTimeline(timelines.get(0), entityId1, entityType1, ev1);
verifyEntityTimeline(timelines.get(1), entityId1b, entityType1, ev1);
timelines = store.getEntityTimelines(entityType1, sortedSet, null,
null, 123l, null).getAllEvents();
assertEquals(2, timelines.size());
verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev1);
verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev1);
verifyEntityTimeline(timelines.get(0), entityId1, entityType1, ev1);
verifyEntityTimeline(timelines.get(1), entityId1b, entityType1, ev1);
timelines = store.getEntityTimelines(entityType1, sortedSet, null,
null, null, Collections.singleton("end_event")).getAllEvents();
assertEquals(2, timelines.size());
verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2);
verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev2);
verifyEntityTimeline(timelines.get(0), entityId1, entityType1, ev2);
verifyEntityTimeline(timelines.get(1), entityId1b, entityType1, ev2);
sortedSet.add(entity2);
sortedSet.add(entityId2);
timelines = store.getEntityTimelines(entityType2, sortedSet, null,
null, null, null).getAllEvents();
assertEquals(1, timelines.size());
verifyEntityTimeline(timelines.get(0), entity2, entityType2, ev3, ev4);
verifyEntityTimeline(timelines.get(0), entityId2, entityType2, ev3, ev4);
}
/**
* Verify a single entity
*/
protected static void verifyEntityInfo(String entity, String entityType,
List<ATSEvent> events, Map<String, Set<String>> relatedEntities,
protected static void verifyEntityInfo(String entityId, String entityType,
List<TimelineEvent> events, Map<String, Set<String>> relatedEntities,
Map<String, Set<Object>> primaryFilters, Map<String, Object> otherInfo,
ATSEntity retrievedEntityInfo) {
if (entity == null) {
TimelineEntity retrievedEntityInfo) {
if (entityId == null) {
assertNull(retrievedEntityInfo);
return;
}
assertEquals(entity, retrievedEntityInfo.getEntityId());
assertEquals(entityId, retrievedEntityInfo.getEntityId());
assertEquals(entityType, retrievedEntityInfo.getEntityType());
if (events == null) {
assertNull(retrievedEntityInfo.getEvents());
@ -511,9 +513,9 @@ protected static void verifyEntityInfo(String entity, String entityType,
* Verify timeline events
*/
private static void verifyEntityTimeline(
ATSEventsOfOneEntity retrievedEvents, String entity, String entityType,
ATSEvent... actualEvents) {
assertEquals(entity, retrievedEvents.getEntityId());
EventsOfOneEntity retrievedEvents, String entityId, String entityType,
TimelineEvent... actualEvents) {
assertEquals(entityId, retrievedEvents.getEntityId());
assertEquals(entityType, retrievedEvents.getEntityType());
assertEquals(actualEvents.length, retrievedEvents.getEvents().size());
for (int i = 0; i < actualEvents.length; i++) {
@ -524,36 +526,36 @@ private static void verifyEntityTimeline(
/**
* Create a test entity
*/
protected static ATSEntity createEntity(String entity, String entityType,
Long startTime, List<ATSEvent> events,
protected static TimelineEntity createEntity(String entityId, String entityType,
Long startTime, List<TimelineEvent> events,
Map<String, Set<String>> relatedEntities,
Map<String, Set<Object>> primaryFilters,
Map<String, Object> otherInfo) {
ATSEntity atsEntity = new ATSEntity();
atsEntity.setEntityId(entity);
atsEntity.setEntityType(entityType);
atsEntity.setStartTime(startTime);
atsEntity.setEvents(events);
TimelineEntity entity = new TimelineEntity();
entity.setEntityId(entityId);
entity.setEntityType(entityType);
entity.setStartTime(startTime);
entity.setEvents(events);
if (relatedEntities != null) {
for (Entry<String, Set<String>> e : relatedEntities.entrySet()) {
for (String v : e.getValue()) {
atsEntity.addRelatedEntity(e.getKey(), v);
entity.addRelatedEntity(e.getKey(), v);
}
}
} else {
atsEntity.setRelatedEntities(null);
entity.setRelatedEntities(null);
}
atsEntity.setPrimaryFilters(primaryFilters);
atsEntity.setOtherInfo(otherInfo);
return atsEntity;
entity.setPrimaryFilters(primaryFilters);
entity.setOtherInfo(otherInfo);
return entity;
}
/**
* Create a test event
*/
private static ATSEvent createEvent(long timestamp, String type, Map<String,
private static TimelineEvent createEvent(long timestamp, String type, Map<String,
Object> info) {
ATSEvent event = new ATSEvent();
TimelineEvent event = new TimelineEvent();
event.setTimestamp(timestamp);
event.setEventType(type);
event.setEventInfo(info);

View File

@ -24,13 +24,13 @@
import junit.framework.Assert;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.TestMemoryApplicationTimelineStore;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TestMemoryTimelineStore;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.junit.Test;
@ -47,23 +47,23 @@
import com.sun.jersey.test.framework.WebAppDescriptor;
public class TestATSWebServices extends JerseyTest {
public class TestTimelineWebServices extends JerseyTest {
private static ApplicationTimelineStore store;
private static TimelineStore store;
private Injector injector = Guice.createInjector(new ServletModule() {
@Override
protected void configureServlets() {
bind(YarnJacksonJaxbJsonProvider.class);
bind(ATSWebServices.class);
bind(TimelineWebServices.class);
bind(GenericExceptionHandler.class);
try{
store = mockApplicationTimelineStore();
store = mockTimelineStore();
} catch (Exception e) {
Assert.fail();
}
bind(ApplicationTimelineStore.class).toInstance(store);
bind(TimelineStore.class).toInstance(store);
serve("/*").with(GuiceContainer.class);
}
@ -77,15 +77,15 @@ protected Injector getInjector() {
}
}
private ApplicationTimelineStore mockApplicationTimelineStore()
private TimelineStore mockTimelineStore()
throws Exception {
TestMemoryApplicationTimelineStore store =
new TestMemoryApplicationTimelineStore();
TestMemoryTimelineStore store =
new TestMemoryTimelineStore();
store.setup();
return store.getApplicationTimelineStore();
return store.getTimelineStore();
}
public TestATSWebServices() {
public TestTimelineWebServices() {
super(new WebAppDescriptor.Builder(
"org.apache.hadoop.yarn.server.applicationhistoryservice.webapp")
.contextListenerClass(GuiceServletConfig.class)
@ -99,28 +99,28 @@ public TestATSWebServices() {
@Test
public void testAbout() throws Exception {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("apptimeline")
ClientResponse response = r.path("ws").path("v1").path("timeline")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
ATSWebServices.AboutInfo about =
response.getEntity(ATSWebServices.AboutInfo.class);
TimelineWebServices.AboutInfo about =
response.getEntity(TimelineWebServices.AboutInfo.class);
Assert.assertNotNull(about);
Assert.assertEquals("Application Timeline API", about.getAbout());
Assert.assertEquals("Timeline API", about.getAbout());
}
@Test
public void testGetEntities() throws Exception {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("apptimeline")
ClientResponse response = r.path("ws").path("v1").path("timeline")
.path("type_1")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
ATSEntities entities = response.getEntity(ATSEntities.class);
TimelineEntities entities = response.getEntity(TimelineEntities.class);
Assert.assertNotNull(entities);
Assert.assertEquals(2, entities.getEntities().size());
ATSEntity entity1 = entities.getEntities().get(0);
TimelineEntity entity1 = entities.getEntities().get(0);
Assert.assertNotNull(entity1);
Assert.assertEquals("id_1", entity1.getEntityId());
Assert.assertEquals("type_1", entity1.getEntityType());
@ -128,7 +128,7 @@ public void testGetEntities() throws Exception {
Assert.assertEquals(2, entity1.getEvents().size());
Assert.assertEquals(2, entity1.getPrimaryFilters().size());
Assert.assertEquals(4, entity1.getOtherInfo().size());
ATSEntity entity2 = entities.getEntities().get(1);
TimelineEntity entity2 = entities.getEntities().get(1);
Assert.assertNotNull(entity2);
Assert.assertEquals("id_2", entity2.getEntityId());
Assert.assertEquals("type_1", entity2.getEntityType());
@ -141,12 +141,12 @@ public void testGetEntities() throws Exception {
@Test
public void testGetEntity() throws Exception {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("apptimeline")
ClientResponse response = r.path("ws").path("v1").path("timeline")
.path("type_1").path("id_1")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
ATSEntity entity = response.getEntity(ATSEntity.class);
TimelineEntity entity = response.getEntity(TimelineEntity.class);
Assert.assertNotNull(entity);
Assert.assertEquals("id_1", entity.getEntityId());
Assert.assertEquals("type_1", entity.getEntityType());
@ -159,12 +159,12 @@ public void testGetEntity() throws Exception {
@Test
public void testGetEntityFields1() throws Exception {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("apptimeline")
ClientResponse response = r.path("ws").path("v1").path("timeline")
.path("type_1").path("id_1").queryParam("fields", "events,otherinfo")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
ATSEntity entity = response.getEntity(ATSEntity.class);
TimelineEntity entity = response.getEntity(TimelineEntity.class);
Assert.assertNotNull(entity);
Assert.assertEquals("id_1", entity.getEntityId());
Assert.assertEquals("type_1", entity.getEntityType());
@ -177,13 +177,13 @@ public void testGetEntityFields1() throws Exception {
@Test
public void testGetEntityFields2() throws Exception {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("apptimeline")
ClientResponse response = r.path("ws").path("v1").path("timeline")
.path("type_1").path("id_1").queryParam("fields", "lasteventonly," +
"primaryfilters,relatedentities")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
ATSEntity entity = response.getEntity(ATSEntity.class);
TimelineEntity entity = response.getEntity(TimelineEntity.class);
Assert.assertNotNull(entity);
Assert.assertEquals("id_1", entity.getEntityId());
Assert.assertEquals("type_1", entity.getEntityType());
@ -196,22 +196,22 @@ public void testGetEntityFields2() throws Exception {
@Test
public void testGetEvents() throws Exception {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("apptimeline")
ClientResponse response = r.path("ws").path("v1").path("timeline")
.path("type_1").path("events")
.queryParam("entityId", "id_1")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
ATSEvents events = response.getEntity(ATSEvents.class);
TimelineEvents events = response.getEntity(TimelineEvents.class);
Assert.assertNotNull(events);
Assert.assertEquals(1, events.getAllEvents().size());
ATSEvents.ATSEventsOfOneEntity partEvents = events.getAllEvents().get(0);
TimelineEvents.EventsOfOneEntity partEvents = events.getAllEvents().get(0);
Assert.assertEquals(2, partEvents.getEvents().size());
ATSEvent event1 = partEvents.getEvents().get(0);
TimelineEvent event1 = partEvents.getEvents().get(0);
Assert.assertEquals(456l, event1.getTimestamp());
Assert.assertEquals("end_event", event1.getEventType());
Assert.assertEquals(1, event1.getEventInfo().size());
ATSEvent event2 = partEvents.getEvents().get(1);
TimelineEvent event2 = partEvents.getEvents().get(1);
Assert.assertEquals(123l, event2.getTimestamp());
Assert.assertEquals("start_event", event2.getEventType());
Assert.assertEquals(0, event2.getEventInfo().size());
@ -219,28 +219,28 @@ public void testGetEvents() throws Exception {
@Test
public void testPostEntities() throws Exception {
ATSEntities entities = new ATSEntities();
ATSEntity entity = new ATSEntity();
TimelineEntities entities = new TimelineEntities();
TimelineEntity entity = new TimelineEntity();
entity.setEntityId("test id");
entity.setEntityType("test type");
entity.setStartTime(System.currentTimeMillis());
entities.addEntity(entity);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("apptimeline")
ClientResponse response = r.path("ws").path("v1").path("timeline")
.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, entities);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
ATSPutErrors errors = response.getEntity(ATSPutErrors.class);
Assert.assertNotNull(errors);
Assert.assertEquals(0, errors.getErrors().size());
TimelinePutResponse putResposne = response.getEntity(TimelinePutResponse.class);
Assert.assertNotNull(putResposne);
Assert.assertEquals(0, putResposne.getErrors().size());
// verify the entity exists in the store
response = r.path("ws").path("v1").path("apptimeline")
response = r.path("ws").path("v1").path("timeline")
.path("test type").path("test id")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
entity = response.getEntity(ATSEntity.class);
entity = response.getEntity(TimelineEntity.class);
Assert.assertNotNull(entity);
Assert.assertEquals("test id", entity.getEntityId());
Assert.assertEquals("test type", entity.getEntityType());