YARN-1659. Defined the ApplicationTimelineStore store as an abstraction for implementing different storage impls for storing timeline information. Contributed by Billie Rinaldi.

svn merge --ignore-ancestry -c 1564037 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1564038 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-02-03 20:07:55 +00:00
parent bd84052ff6
commit fb892d8f84
11 changed files with 541 additions and 28 deletions

View File

@ -63,6 +63,10 @@ Release 2.4.0 - UNRELEASED
used by ResourceManager to read configuration locally or from remote systems used by ResourceManager to read configuration locally or from remote systems
so as to help RM failover. (Xuan Gong via vinodkv) so as to help RM failover. (Xuan Gong via vinodkv)
YARN-1659. Defined the ApplicationTimelineStore store as an abstraction for
implementing different storage impls for storing timeline information.
(Billie Rinaldi via vinodkv)
IMPROVEMENTS IMPROVEMENTS
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAccessorType;
@ -53,10 +54,10 @@ public class ATSEntity {
private String entityType; private String entityType;
private String entityId; private String entityId;
private long startTime; private Long startTime;
private List<ATSEvent> events = new ArrayList<ATSEvent>(); private List<ATSEvent> events = new ArrayList<ATSEvent>();
private Map<String, List<Object>> relatedEntities = private Map<String, List<String>> relatedEntities =
new HashMap<String, List<Object>>(); new HashMap<String, List<String>>();
private Map<String, Object> primaryFilters = private Map<String, Object> primaryFilters =
new HashMap<String, Object>(); new HashMap<String, Object>();
private Map<String, Object> otherInfo = private Map<String, Object> otherInfo =
@ -112,7 +113,7 @@ public class ATSEntity {
* @return the start time of the entity * @return the start time of the entity
*/ */
@XmlElement(name = "starttime") @XmlElement(name = "starttime")
public long getStartTime() { public Long getStartTime() {
return startTime; return startTime;
} }
@ -122,7 +123,7 @@ public class ATSEntity {
* @param startTime * @param startTime
* the start time of the entity * the start time of the entity
*/ */
public void setStartTime(long startTime) { public void setStartTime(Long startTime) {
this.startTime = startTime; this.startTime = startTime;
} }
@ -172,26 +173,25 @@ public class ATSEntity {
* @return the related entities * @return the related entities
*/ */
@XmlElement(name = "relatedentities") @XmlElement(name = "relatedentities")
public Map<String, List<Object>> getRelatedEntities() { public Map<String, List<String>> getRelatedEntities() {
return relatedEntities; return relatedEntities;
} }
/** /**
* Add a list of entity of the same type to the existing related entity map * Add an entity to the existing related entity map
* *
* @param entityType * @param entityType
* the entity type * the entity type
* @param entityIds * @param entityId
* a list of entity Ids * the entity Id
*/ */
public void addRelatedEntity(String entityType, List<Object> entityIds) { public void addRelatedEntity(String entityType, String entityId) {
List<Object> thisRelatedEntity = relatedEntities.get(entityType); List<String> thisRelatedEntity = relatedEntities.get(entityType);
relatedEntities.put(entityType, entityIds);
if (thisRelatedEntity == null) { if (thisRelatedEntity == null) {
relatedEntities.put(entityType, entityIds); thisRelatedEntity = new ArrayList<String>();
} else { relatedEntities.put(entityType, thisRelatedEntity);
thisRelatedEntity.addAll(entityIds);
} }
thisRelatedEntity.add(entityId);
} }
/** /**
@ -200,11 +200,10 @@ public class ATSEntity {
* @param relatedEntities * @param relatedEntities
* a map of related entities * a map of related entities
*/ */
public void addRelatedEntities( public void addRelatedEntities(Map<String, List<String>> relatedEntities) {
Map<String, List<Object>> relatedEntities) { for (Entry<String, List<String>> relatedEntity :
for (Map.Entry<String, List<Object>> relatedEntity : relatedEntities relatedEntities.entrySet()) {
.entrySet()) { List<String> thisRelatedEntity =
List<Object> thisRelatedEntity =
this.relatedEntities.get(relatedEntity.getKey()); this.relatedEntities.get(relatedEntity.getKey());
if (thisRelatedEntity == null) { if (thisRelatedEntity == null) {
this.relatedEntities.put( this.relatedEntities.put(
@ -222,7 +221,7 @@ public class ATSEntity {
* a map of related entities * a map of related entities
*/ */
public void setRelatedEntities( public void setRelatedEntities(
Map<String, List<Object>> relatedEntities) { Map<String, List<String>> relatedEntities) {
this.relatedEntities = relatedEntities; this.relatedEntities = relatedEntities;
} }

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
@XmlAccessorType(XmlAccessType.NONE) @XmlAccessorType(XmlAccessType.NONE)
@Public @Public
@Unstable @Unstable
public class ATSEvent { public class ATSEvent implements Comparable<ATSEvent> {
private long timestamp; private long timestamp;
private String eventType; private String eventType;
@ -131,4 +131,42 @@ public class ATSEvent {
this.eventInfo = eventInfo; this.eventInfo = eventInfo;
} }
@Override
public int compareTo(ATSEvent other) {
if (timestamp > other.timestamp) {
return -1;
} else if (timestamp < other.timestamp) {
return 1;
} else {
return eventType.compareTo(other.eventType);
}
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
ATSEvent atsEvent = (ATSEvent) o;
if (timestamp != atsEvent.timestamp)
return false;
if (!eventType.equals(atsEvent.eventType))
return false;
if (eventInfo != null ? !eventInfo.equals(atsEvent.eventInfo) :
atsEvent.eventInfo != null)
return false;
return true;
}
@Override
public int hashCode() {
int result = (int) (timestamp ^ (timestamp >>> 32));
result = 31 * result + eventType.hashCode();
result = 31 * result + (eventInfo != null ? eventInfo.hashCode() : 0);
return result;
}
} }

View File

@ -160,7 +160,7 @@ public class ATSEvents {
* @param event * @param event
* a single event * a single event
*/ */
public void addEntity(ATSEvent event) { public void addEvent(ATSEvent event) {
events.add(event); events.add(event);
} }

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.apptimeline;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;
import java.util.List;
/**
* A class that holds a list of put errors. This is the response returned
* when a list of {@link ATSEntity} objects is added to the application
* timeline. If there are errors in storing individual entity objects,
* they will be indicated in the list of errors.
*/
@XmlRootElement(name = "errors")
@XmlAccessorType(XmlAccessType.NONE)
@Public
@Unstable
public class ATSPutErrors {
private List<ATSPutError> errors = new ArrayList<ATSPutError>();
public ATSPutErrors() {
}
/**
* Get a list of {@link ATSPutError} instances
*
* @return a list of {@link ATSPutError} instances
*/
@XmlElement(name = "errors")
public List<ATSPutError> getErrors() {
return errors;
}
/**
* Add a single {@link ATSPutError} instance into the existing list
*
* @param error
* a single {@link ATSPutError} instance
*/
public void addError(ATSPutError error) {
errors.add(error);
}
/**
* Add a list of {@link ATSPutError} instances into the existing list
*
* @param errors
* a list of {@link ATSPutError} instances
*/
public void addErrors(List<ATSPutError> errors) {
this.errors.addAll(errors);
}
/**
* Set the list to the given list of {@link ATSPutError} instances
*
* @param errors
* a list of {@link ATSPutError} instances
*/
public void setErrors(List<ATSPutError> errors) {
this.errors.clear();
this.errors.addAll(errors);
}
/**
* A class that holds the error code for one entity.
*/
@XmlRootElement(name = "error")
@XmlAccessorType(XmlAccessType.NONE)
@Public
@Unstable
public static class ATSPutError {
private String entityId;
private String entityType;
private Integer errorCode;
/**
* Get the entity Id
*
* @return the entity Id
*/
@XmlElement(name = "entity")
public String getEntityId() {
return entityId;
}
/**
* Set the entity Id
*
* @param entityId
* the entity Id
*/
public void setEntityId(String entityId) {
this.entityId = entityId;
}
/**
* Get the entity type
*
* @return the entity type
*/
@XmlElement(name = "entitytype")
public String getEntityType() {
return entityType;
}
/**
* Set the entity type
*
* @param entityType
* the entity type
*/
public void setEntityType(String entityType) {
this.entityType = entityType;
}
/**
* Get the error code
*
* @return an error code
*/
@XmlElement(name = "errorcode")
public Integer getErrorCode() {
return errorCode;
}
/**
* Set the error code to the given error code
*
* @param errorCode
* an error code
*/
public void setErrorCode(Integer errorCode) {
this.errorCode = errorCode;
}
}
}

View File

@ -18,10 +18,13 @@
package org.apache.hadoop.yarn.api.records.apptimeline; package org.apache.hadoop.yarn.api.records.apptimeline;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError;
import org.junit.Test; import org.junit.Test;
public class TestApplicationTimelineRecords { public class TestApplicationTimelineRecords {
@ -42,10 +45,8 @@ public class TestApplicationTimelineRecords {
event.addEventInfo("key2", "val2"); event.addEventInfo("key2", "val2");
entity.addEvent(event); entity.addEvent(event);
} }
entity.addRelatedEntity( entity.addRelatedEntity("test ref type 1", "test ref id 1");
"test ref type 1", Arrays.asList((Object) "test ref id 1")); entity.addRelatedEntity("test ref type 2", "test ref id 2");
entity.addRelatedEntity(
"test ref type 2", Arrays.asList((Object) "test ref id 2"));
entity.addPrimaryFilter("pkey1", "pval1"); entity.addPrimaryFilter("pkey1", "pval1");
entity.addPrimaryFilter("pkey2", "pval2"); entity.addPrimaryFilter("pkey2", "pval2");
entity.addOtherInfo("okey1", "oval1"); entity.addOtherInfo("okey1", "oval1");
@ -83,7 +84,7 @@ public class TestApplicationTimelineRecords {
event.setEventType("event type " + i); event.setEventType("event type " + i);
event.addEventInfo("key1", "val1"); event.addEventInfo("key1", "val1");
event.addEventInfo("key2", "val2"); event.addEventInfo("key2", "val2");
partEvents.addEntity(event); partEvents.addEvent(event);
} }
events.addEvent(partEvents); events.addEvent(partEvents);
} }
@ -110,4 +111,36 @@ public class TestApplicationTimelineRecords {
Assert.assertEquals(2, event22.getEventInfo().size()); Assert.assertEquals(2, event22.getEventInfo().size());
} }
@Test
public void testATSPutErrors() {
ATSPutErrors atsPutErrors = new ATSPutErrors();
ATSPutError error1 = new ATSPutError();
error1.setEntityId("entity id 1");
error1.setEntityId("entity type 1");
error1.setErrorCode(1);
atsPutErrors.addError(error1);
List<ATSPutError> errors = new ArrayList<ATSPutError>();
errors.add(error1);
ATSPutError error2 = new ATSPutError();
error2.setEntityId("entity id 2");
error2.setEntityId("entity type 2");
error2.setErrorCode(2);
errors.add(error2);
atsPutErrors.addErrors(errors);
Assert.assertEquals(3, atsPutErrors.getErrors().size());
ATSPutError e = atsPutErrors.getErrors().get(0);
Assert.assertEquals(error1.getEntityId(), e.getEntityId());
Assert.assertEquals(error1.getEntityType(), e.getEntityType());
Assert.assertEquals(error1.getErrorCode(), e.getErrorCode());
e = atsPutErrors.getErrors().get(1);
Assert.assertEquals(error1.getEntityId(), e.getEntityId());
Assert.assertEquals(error1.getEntityType(), e.getEntityType());
Assert.assertEquals(error1.getErrorCode(), e.getErrorCode());
e = atsPutErrors.getErrors().get(2);
Assert.assertEquals(error2.getEntityId(), e.getEntityId());
Assert.assertEquals(error2.getEntityType(), e.getEntityType());
Assert.assertEquals(error2.getErrorCode(), e.getErrorCode());
}
} }

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Set;
import java.util.SortedSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents;
/**
* This interface is for retrieving application timeline information.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface ApplicationTimelineReader {
/**
* Possible fields to retrieve for {@link #getEntities} and {@link
* #getEntity}.
*/
enum Field {
EVENTS,
RELATED_ENTITIES,
PRIMARY_FILTERS,
OTHER_INFO,
LAST_EVENT_ONLY
}
/**
* Default limit for {@link #getEntities} and {@link #getEntityTimelines}.
*/
final long DEFAULT_LIMIT = 100;
/**
* This method retrieves a list of entity information, {@link ATSEntity},
* sorted by the starting timestamp for the entity, descending.
*
* @param entityType The type of entities to return (required).
* @param limit A limit on the number of entities to return. If null,
* defaults to {@link #DEFAULT_LIMIT}.
* @param windowStart The earliest start timestamp to retrieve (exclusive).
* If null, defaults to retrieving all entities until the
* limit is reached.
* @param windowEnd The latest start timestamp to retrieve (inclusive).
* If null, defaults to {@link Long#MAX_VALUE}
* @param primaryFilter Retrieves only entities that have the specified
* primary filter. If null, retrieves all entities.
* This is an indexed retrieval, and no entities that
* do not match the filter are scanned.
* @param secondaryFilters Retrieves only entities that have exact matches
* for all the specified filters in their primary
* filters or other info. This is not an indexed
* retrieval, so all entities are scanned but only
* those matching the filters are returned.
* @param fieldsToRetrieve Specifies which fields of the entity object to
* retrieve (see {@link Field}). If the set of fields
* contains {@link Field#LAST_EVENT_ONLY} and not
* {@link Field#EVENTS}, the most recent event for
* each entity is retrieved.
* @return An {@link ATSEntities} object.
*/
ATSEntities getEntities(String entityType,
Long limit, Long windowStart, Long windowEnd,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fieldsToRetrieve);
/**
* This method retrieves the entity information for a given entity.
*
* @param entity The entity whose information will be retrieved.
* @param entityType The type of the entity.
* @param fieldsToRetrieve Specifies which fields of the entity object to
* retrieve (see {@link Field}). If the set of
* fields contains {@link Field#LAST_EVENT_ONLY} and
* not {@link Field#EVENTS}, the most recent event
* for each entity is retrieved.
* @return An {@link ATSEntity} object.
*/
ATSEntity getEntity(String entity, String entityType, EnumSet<Field>
fieldsToRetrieve);
/**
* This method retrieves the events for a list of entities all of the same
* entity type. The events for each entity are sorted in order of their
* timestamps, descending.
*
* @param entityType The type of entities to retrieve events for.
* @param entityIds The entity IDs to retrieve events for.
* @param limit A limit on the number of events to return for each entity.
* If null, defaults to {@link #DEFAULT_LIMIT} events per
* entity.
* @param windowStart If not null, retrieves only events later than the
* given time (exclusive)
* @param windowEnd If not null, retrieves only events earlier than the
* given time (inclusive)
* @param eventTypes Restricts the events returned to the given types. If
* null, events of all types will be returned.
* @return An {@link ATSEvents} object.
*/
ATSEvents getEntityTimelines(String entityType,
SortedSet<String> entityIds, Long limit, Long windowStart,
Long windowEnd, Set<String> eventTypes);
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.service.Service;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface ApplicationTimelineStore extends
Service, ApplicationTimelineReader, ApplicationTimelineWriter {
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
/**
* This interface is for storing application timeline information.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface ApplicationTimelineWriter {
/**
* Stores entity information to the application timeline store. Any errors
* occurring for individual put request objects will be reported in the
* response.
*
* @param data An {@link ATSEntities} object.
* @return An {@link ATSPutErrors} object.
*/
ATSPutErrors put(ATSEntities data);
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* A class holding a name and value pair, used for specifying filters in
* {@link ApplicationTimelineReader}.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class NameValuePair {
String name;
Object value;
public NameValuePair(String name, Object value) {
this.name = name;
this.value = value;
}
/**
* Get the name.
* @return The name.
*/
public String getName() {
return name;
}
/**
* Get the value.
* @return The value.
*/
public Object getValue() {
return value;
}
@Override
public String toString() {
return "{ name: " + name + ", value: " + value + " }";
}
}

View File

@ -0,0 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Private
package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
import org.apache.hadoop.classification.InterfaceAudience;