YARN-3908. Fixed bugs in HBaseTimelineWriterImpl. Contributed by Vrushali C and Sangjin Lee.

(cherry picked from commit df0ec473a84871b0effd7ca6faac776210d7df09)
This commit is contained in:
Zhijie Shen 2015-07-27 15:50:28 -07:00 committed by Sangjin Lee
parent 8603736ef2
commit a9fab9b644
8 changed files with 108 additions and 26 deletions

View File

@ -33,6 +33,8 @@ import java.util.Map;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class TimelineEvent implements Comparable<TimelineEvent> { public class TimelineEvent implements Comparable<TimelineEvent> {
public static final long INVALID_TIMESTAMP = 0L;
private String id; private String id;
private HashMap<String, Object> info = new HashMap<>(); private HashMap<String, Object> info = new HashMap<>();
private long timestamp; private long timestamp;
@ -83,7 +85,7 @@ public class TimelineEvent implements Comparable<TimelineEvent> {
} }
public boolean isValid() { public boolean isValid() {
return (id != null && timestamp != 0L); return (id != null && timestamp != INVALID_TIMESTAMP);
} }
@Override @Override

View File

@ -141,6 +141,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null, EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null,
te.getModifiedTime()); te.getModifiedTime());
EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
Map<String, Object> info = te.getInfo();
if (info != null) {
for (Map.Entry<String, Object> entry : info.entrySet()) {
EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(),
null, entry.getValue());
}
}
} }
/** /**
@ -186,6 +193,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
if (event != null) { if (event != null) {
String eventId = event.getId(); String eventId = event.getId();
if (eventId != null) { if (eventId != null) {
long eventTimestamp = event.getTimestamp();
// if the timestamp is not set, use the current timestamp
if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) {
LOG.warn("timestamp is not set for event " + eventId +
"! Using the current timestamp");
eventTimestamp = System.currentTimeMillis();
}
Map<String, Object> eventInfo = event.getInfo(); Map<String, Object> eventInfo = event.getInfo();
if (eventInfo != null) { if (eventInfo != null) {
for (Map.Entry<String, Object> info : eventInfo.entrySet()) { for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
@ -198,8 +212,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
// convert back to string to avoid additional API on store. // convert back to string to avoid additional API on store.
String compoundColumnQualifier = String compoundColumnQualifier =
Bytes.toString(compoundColumnQualifierBytes); Bytes.toString(compoundColumnQualifierBytes);
EntityColumnPrefix.METRIC.store(rowKey, entityTable, EntityColumnPrefix.EVENT.store(rowKey, entityTable,
compoundColumnQualifier, null, info.getValue()); compoundColumnQualifier, eventTimestamp, info.getValue());
} // for info: eventInfo } // for info: eventInfo
} }
} }

View File

@ -113,19 +113,22 @@ public class ColumnHelper<T> {
} }
/** /**
* @param result from which to reads timeseries data * @param result from which to reads data with timestamps
* @param columnPrefixBytes optional prefix to limit columns. If null all * @param columnPrefixBytes optional prefix to limit columns. If null all
* columns are returned. * columns are returned.
* @param <V> the type of the values. The values will be cast into that type.
* @return the cell values at each respective time in for form * @return the cell values at each respective time in for form
* {idA={timestamp1->value1}, idA={timestamp2->value2}, * {idA={timestamp1->value1}, idA={timestamp2->value2},
* idB={timestamp3->value3}, idC={timestamp1->value4}} * idB={timestamp3->value3}, idC={timestamp1->value4}}
* @throws IOException * @throws IOException
*/ */
public NavigableMap<String, NavigableMap<Long, Number>> readTimeseriesResults( @SuppressWarnings("unchecked")
Result result, byte[] columnPrefixBytes) throws IOException { public <V> NavigableMap<String, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result, byte[] columnPrefixBytes)
throws IOException {
NavigableMap<String, NavigableMap<Long, Number>> results = NavigableMap<String, NavigableMap<Long, V>> results =
new TreeMap<String, NavigableMap<Long, Number>>(); new TreeMap<String, NavigableMap<Long, V>>();
if (result != null) { if (result != null) {
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> resultMap = NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> resultMap =
@ -157,13 +160,13 @@ public class ColumnHelper<T> {
// If this column has the prefix we want // If this column has the prefix we want
if (columnName != null) { if (columnName != null) {
NavigableMap<Long, Number> cellResults = NavigableMap<Long, V> cellResults =
new TreeMap<Long, Number>(); new TreeMap<Long, V>();
NavigableMap<Long, byte[]> cells = entry.getValue(); NavigableMap<Long, byte[]> cells = entry.getValue();
if (cells != null) { if (cells != null) {
for (Entry<Long, byte[]> cell : cells.entrySet()) { for (Entry<Long, byte[]> cell : cells.entrySet()) {
Number value = V value =
(Number) GenericObjectMapper.read(cell.getValue()); (V) GenericObjectMapper.read(cell.getValue());
cellResults.put(cell.getKey(), value); cellResults.put(cell.getKey(), value);
} }
} }

View File

@ -72,12 +72,13 @@ public interface ColumnPrefix<T> {
public Map<String, Object> readResults(Result result) throws IOException; public Map<String, Object> readResults(Result result) throws IOException;
/** /**
* @param result from which to reads timeseries data * @param result from which to reads data with timestamps
* @param <V> the type of the values. The values will be cast into that type.
* @return the cell values at each respective time in for form * @return the cell values at each respective time in for form
* {idA={timestamp1->value1}, idA={timestamp2->value2}, * {idA={timestamp1->value1}, idA={timestamp2->value2},
* idB={timestamp3->value3}, idC={timestamp1->value4}} * idB={timestamp3->value3}, idC={timestamp1->value4}}
* @throws IOException * @throws IOException
*/ */
public NavigableMap<String, NavigableMap<Long, Number>> readTimeseriesResults( public <V> NavigableMap<String, NavigableMap<Long, V>>
Result result) throws IOException; readResultsWithTimestamps(Result result) throws IOException;
} }

View File

@ -89,6 +89,13 @@ public enum Separator {
this.quotedValue = Pattern.quote(value); this.quotedValue = Pattern.quote(value);
} }
/**
* @return the original value of the separator
*/
public String getValue() {
return value;
}
/** /**
* Used to make token safe to be used with this separator without collisions. * Used to make token safe to be used with this separator without collisions.
* *

View File

@ -44,6 +44,11 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
*/ */
RELATES_TO(EntityColumnFamily.INFO, "r"), RELATES_TO(EntityColumnFamily.INFO, "r"),
/**
* To store TimelineEntity info values.
*/
INFO(EntityColumnFamily.INFO, "i"),
/** /**
* Lifecycle events for an entity * Lifecycle events for an entity
*/ */
@ -92,7 +97,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
/** /**
* @return the column name value * @return the column name value
*/ */
private String getColumnPrefix() { public String getColumnPrefix() {
return columnPrefix; return columnPrefix;
} }
@ -150,11 +155,11 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
* *
* @see * @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readTimeseriesResults(org.apache.hadoop.hbase.client.Result) * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
*/ */
public NavigableMap<String, NavigableMap<Long, Number>> readTimeseriesResults( public <T> NavigableMap<String, NavigableMap<Long, T>>
Result result) throws IOException { readResultsWithTimestamps(Result result) throws IOException {
return column.readTimeseriesResults(result, columnPrefixBytes); return column.readResultsWithTimestamps(result, columnPrefixBytes);
} }
/** /**

View File

@ -54,7 +54,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEnti
* | | modified_time: | | | * | | modified_time: | | |
* | | 1392995081012 | metricId2: | | * | | 1392995081012 | metricId2: | |
* | | | metricValue1 | | * | | | metricValue1 | |
* | | r!relatesToKey: | @timestamp2 | | * | | i!infoKey: | @timestamp2 | |
* | | infoValue | | |
* | | | | |
* | | r!relatesToKey: | | |
* | | id3?id4?id5 | | | * | | id3?id4?id5 | | |
* | | | | | * | | | | |
* | | s!isRelatedToKey | | | * | | s!isRelatedToKey | | |
@ -62,6 +65,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEnti
* | | | | | * | | | | |
* | | e!eventId?eventInfoKey: | | | * | | e!eventId?eventInfoKey: | | |
* | | eventInfoValue | | | * | | eventInfoValue | | |
* | | @timestamp | | |
* | | | | | * | | | | |
* | | flowVersion: | | | * | | flowVersion: | | |
* | | versionValue | | | * | | versionValue | | |

View File

@ -43,8 +43,10 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
@ -84,6 +86,12 @@ public class TestHBaseTimelineWriterImpl {
entity.setCreatedTime(cTime); entity.setCreatedTime(cTime);
entity.setModifiedTime(mTime); entity.setModifiedTime(mTime);
// add the info map in Timeline Entity
Map<String, Object> infoMap = new HashMap<String, Object>();
infoMap.put("infoMapKey1", "infoMapValue1");
infoMap.put("infoMapKey2", 10);
entity.addInfo(infoMap);
// add the isRelatedToEntity info // add the isRelatedToEntity info
String key = "task"; String key = "task";
String value = "is_related_to_entity_id_here"; String value = "is_related_to_entity_id_here";
@ -177,6 +185,14 @@ public class TestHBaseTimelineWriterImpl {
Long mTime1 = val.longValue(); Long mTime1 = val.longValue();
assertEquals(mTime1, mTime); assertEquals(mTime1, mTime);
Map<String, Object> infoColumns =
EntityColumnPrefix.INFO.readResults(result);
assertEquals(infoMap.size(), infoColumns.size());
for (String infoItem : infoMap.keySet()) {
assertEquals(infoMap.get(infoItem),
infoColumns.get(infoItem));
}
// Remember isRelatedTo is of type Map<String, Set<String>> // Remember isRelatedTo is of type Map<String, Set<String>>
for (String isRelatedToKey : isRelatedTo.keySet()) { for (String isRelatedToKey : isRelatedTo.keySet()) {
Object isRelatedToValue = Object isRelatedToValue =
@ -219,7 +235,7 @@ public class TestHBaseTimelineWriterImpl {
} }
NavigableMap<String, NavigableMap<Long, Number>> metricsResult = NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
EntityColumnPrefix.METRIC.readTimeseriesResults(result); EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId()); NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
// We got metrics back // We got metrics back
@ -237,7 +253,7 @@ public class TestHBaseTimelineWriterImpl {
} }
} }
assertEquals(1, rowCount); assertEquals(1, rowCount);
assertEquals(15, colCount); assertEquals(17, colCount);
} finally { } finally {
hbi.stop(); hbi.stop();
@ -267,13 +283,18 @@ public class TestHBaseTimelineWriterImpl {
private void testAdditionalEntity() throws IOException { private void testAdditionalEntity() throws IOException {
TimelineEvent event = new TimelineEvent(); TimelineEvent event = new TimelineEvent();
event.setId("foo_event_id"); String eventId = "foo_event_id";
event.setTimestamp(System.currentTimeMillis()); event.setId(eventId);
event.addInfo("foo_event", "test"); Long expTs = 1436512802000L;
event.setTimestamp(expTs);
String expKey = "foo_event";
Object expVal = "test";
event.addInfo(expKey, expVal);
final TimelineEntity entity = new TimelineEntity(); final TimelineEntity entity = new TimelineEntity();
entity.setId("attempt_1329348432655_0001_m_000008_18"); entity.setId("attempt_1329348432655_0001_m_000008_18");
entity.setType("FOO_ATTEMPT"); entity.setType("FOO_ATTEMPT");
entity.addEvent(event);
TimelineEntities entities = new TimelineEntities(); TimelineEntities entities = new TimelineEntities();
entities.addEntity(entity); entities.addEntity(entity);
@ -304,6 +325,31 @@ public class TestHBaseTimelineWriterImpl {
for (Result result : scanner) { for (Result result : scanner) {
if (result != null && !result.isEmpty()) { if (result != null && !result.isEmpty()) {
rowCount++; rowCount++;
// check the row key
byte[] row1 = result.getRow();
assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
entity));
// check the events
NavigableMap<String, NavigableMap<Long, Object>> eventsResult =
EntityColumnPrefix.EVENT.readResultsWithTimestamps(result);
// there should be only one event
assertEquals(1, eventsResult.size());
// key name for the event
String valueKey = eventId + Separator.VALUES.getValue() + expKey;
for (Map.Entry<String, NavigableMap<Long, Object>> e :
eventsResult.entrySet()) {
// the value key must match
assertEquals(valueKey, e.getKey());
NavigableMap<Long, Object> value = e.getValue();
// there should be only one timestamp and value
assertEquals(1, value.size());
for (Map.Entry<Long, Object> e2: value.entrySet()) {
assertEquals(expTs, e2.getKey());
assertEquals(expVal, e2.getValue());
}
}
} }
} }
assertEquals(1, rowCount); assertEquals(1, rowCount);