YARN-3984. Adjusted the event column key schema and avoided missing empty event. Contributed by Vrushali C.

(cherry picked from commit 895ccfa1ab9e701f2908586e323249f670fe5544)
This commit is contained in:
Zhijie Shen 2015-08-05 16:28:57 -07:00 committed by Sangjin Lee
parent 57e2498cd4
commit 9422d9b50d
4 changed files with 125 additions and 34 deletions

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@ -200,20 +201,32 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
"! Using the current timestamp"); "! Using the current timestamp");
eventTimestamp = System.currentTimeMillis(); eventTimestamp = System.currentTimeMillis();
} }
byte[] columnQualifierFirst =
Bytes.toBytes(Separator.VALUES.encode(eventId));
byte[] columnQualifierWithTsBytes =
Separator.VALUES.join(columnQualifierFirst,
Bytes.toBytes(TimelineWriterUtils.invert(eventTimestamp)));
Map<String, Object> eventInfo = event.getInfo(); Map<String, Object> eventInfo = event.getInfo();
if (eventInfo != null) { if ((eventInfo == null) || (eventInfo.size() == 0)) {
// add separator since event key is empty
byte[] compoundColumnQualifierBytes =
Separator.VALUES.join(columnQualifierWithTsBytes,
null);
String compoundColumnQualifier =
Bytes.toString(compoundColumnQualifierBytes);
EntityColumnPrefix.EVENT.store(rowKey, entityTable,
compoundColumnQualifier, null, TimelineWriterUtils.EMPTY_BYTES);
} else {
for (Map.Entry<String, Object> info : eventInfo.entrySet()) { for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
// eventId?infoKey // eventId?infoKey
byte[] columnQualifierFirst =
Bytes.toBytes(Separator.VALUES.encode(eventId));
byte[] compoundColumnQualifierBytes = byte[] compoundColumnQualifierBytes =
Separator.VALUES.join(columnQualifierFirst, Separator.VALUES.join(columnQualifierWithTsBytes,
Bytes.toBytes(info.getKey())); Bytes.toBytes(info.getKey()));
// convert back to string to avoid additional API on store. // convert back to string to avoid additional API on store.
String compoundColumnQualifier = String compoundColumnQualifier =
Bytes.toString(compoundColumnQualifierBytes); Bytes.toString(compoundColumnQualifierBytes);
EntityColumnPrefix.EVENT.store(rowKey, entityTable, EntityColumnPrefix.EVENT.store(rowKey, entityTable,
compoundColumnQualifier, eventTimestamp, info.getValue()); compoundColumnQualifier, null, info.getValue());
} // for info: eventInfo } // for info: eventInfo
} }
} }

View File

@ -124,4 +124,17 @@ public class TimelineWriterUtils {
return segments; return segments;
} }
/**
* Converts a timestamp into it's inverse timestamp to be used in (row) keys
* where we want to have the most recent timestamp in the top of the table
* (scans start at the most recent timestamp first).
*
* @param key value to be inverted so that the latest version will be first in
* a scan.
* @return inverted long
*/
public static long invert(Long key) {
return Long.MAX_VALUE - key;
}
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
/** /**
* Represents a rowkey for the entity table. * Represents a rowkey for the entity table.
@ -47,7 +48,7 @@ public class EntityRowKey {
flowId)); flowId));
// Note that flowRunId is a long, so we can't encode them all at the same // Note that flowRunId is a long, so we can't encode them all at the same
// time. // time.
byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId)); byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
byte[] third = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId)); byte[] third = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId));
return Separator.QUALIFIERS.join(first, second, third); return Separator.QUALIFIERS.join(first, second, third);
} }
@ -70,24 +71,11 @@ public class EntityRowKey {
flowId)); flowId));
// Note that flowRunId is a long, so we can't encode them all at the same // Note that flowRunId is a long, so we can't encode them all at the same
// time. // time.
byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId)); byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
byte[] third = byte[] third =
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(), Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(),
te.getId())); te.getId()));
return Separator.QUALIFIERS.join(first, second, third); return Separator.QUALIFIERS.join(first, second, third);
} }
/**
* Converts a timestamp into it's inverse timestamp to be used in (row) keys
* where we want to have the most recent timestamp in the top of the table
* (scans start at the most recent timestamp first).
*
* @param key value to be inverted so that the latest version will be first in
* a scan.
* @return inverted long
*/
public static long invert(Long key) {
return Long.MAX_VALUE - key;
}
} }

View File

@ -43,8 +43,8 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@ -274,7 +274,7 @@ public class TestHBaseTimelineWriterImpl {
assertEquals(user, Bytes.toString(rowKeyComponents[0])); assertEquals(user, Bytes.toString(rowKeyComponents[0]));
assertEquals(cluster, Bytes.toString(rowKeyComponents[1])); assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
assertEquals(flow, Bytes.toString(rowKeyComponents[2])); assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
assertEquals(EntityRowKey.invert(runid), Bytes.toLong(rowKeyComponents[3])); assertEquals(TimelineWriterUtils.invert(runid), Bytes.toLong(rowKeyComponents[3]));
assertEquals(appName, Bytes.toString(rowKeyComponents[4])); assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5])); assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6])); assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
@ -317,7 +317,6 @@ public class TestHBaseTimelineWriterImpl {
byte[] startRow = byte[] startRow =
EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName); EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
s.setStartRow(startRow); s.setStartRow(startRow);
s.setMaxVersions(Integer.MAX_VALUE);
Connection conn = ConnectionFactory.createConnection(c1); Connection conn = ConnectionFactory.createConnection(c1);
ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s); ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
@ -331,24 +330,23 @@ public class TestHBaseTimelineWriterImpl {
assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
entity)); entity));
// check the events Map<String, Object> eventsResult =
NavigableMap<String, NavigableMap<Long, Object>> eventsResult = EntityColumnPrefix.EVENT.readResults(result);
EntityColumnPrefix.EVENT.readResultsWithTimestamps(result);
// there should be only one event // there should be only one event
assertEquals(1, eventsResult.size()); assertEquals(1, eventsResult.size());
// key name for the event // key name for the event
String valueKey = eventId + Separator.VALUES.getValue() + expKey; byte[] compoundColumnQualifierBytes =
for (Map.Entry<String, NavigableMap<Long, Object>> e : Separator.VALUES.join(Bytes.toBytes(eventId),
Bytes.toBytes(TimelineWriterUtils.invert(expTs)),
Bytes.toBytes(expKey));
String valueKey = Bytes.toString(compoundColumnQualifierBytes);
for (Map.Entry<String, Object> e :
eventsResult.entrySet()) { eventsResult.entrySet()) {
// the value key must match // the value key must match
assertEquals(valueKey, e.getKey()); assertEquals(valueKey, e.getKey());
NavigableMap<Long, Object> value = e.getValue(); Object value = e.getValue();
// there should be only one timestamp and value // there should be only one timestamp and value
assertEquals(1, value.size()); assertEquals(expVal, value.toString());
for (Map.Entry<Long, Object> e2: value.entrySet()) {
assertEquals(expTs, e2.getKey());
assertEquals(expVal, e2.getValue());
}
} }
} }
} }
@ -360,6 +358,85 @@ public class TestHBaseTimelineWriterImpl {
} }
} }
@Test
public void testAdditionalEntityEmptyEventInfo() throws IOException {
TimelineEvent event = new TimelineEvent();
String eventId = "foo_event_id";
event.setId(eventId);
Long expTs = 1436512802000L;
event.setTimestamp(expTs);
final TimelineEntity entity = new TimelineEntity();
entity.setId("attempt_1329348432655_0001_m_000008_18");
entity.setType("FOO_ATTEMPT");
entity.addEvent(event);
TimelineEntities entities = new TimelineEntities();
entities.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
try {
Configuration c1 = util.getConfiguration();
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
String cluster = "cluster_emptyeventkey";
String user = "user_emptyeventkey";
String flow = "other_flow_name";
String flowVersion = "1111F01C2287BA";
long runid = 1009876543218L;
String appName = "some app name";
byte[] startRow =
EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
hbi.stop();
// scan the table and see that entity exists
Scan s = new Scan();
s.setStartRow(startRow);
s.addFamily(EntityColumnFamily.INFO.getBytes());
Connection conn = ConnectionFactory.createConnection(c1);
ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
int rowCount = 0;
for (Result result : scanner) {
if (result != null && !result.isEmpty()) {
rowCount++;
// check the row key
byte[] row1 = result.getRow();
assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
entity));
Map<String, Object> eventsResult =
EntityColumnPrefix.EVENT.readResults(result);
// there should be only one event
assertEquals(1, eventsResult.size());
// key name for the event
byte[] compoundColumnQualifierWithTsBytes =
Separator.VALUES.join(Bytes.toBytes(eventId),
Bytes.toBytes(TimelineWriterUtils.invert(expTs)));
byte[] compoundColumnQualifierBytes =
Separator.VALUES.join(compoundColumnQualifierWithTsBytes,
null);
String valueKey = Bytes.toString(compoundColumnQualifierBytes);
for (Map.Entry<String, Object> e :
eventsResult.entrySet()) {
// the column qualifier key must match
assertEquals(valueKey, e.getKey());
Object value = e.getValue();
// value should be empty
assertEquals("", value.toString());
}
}
}
assertEquals(1, rowCount);
} finally {
hbi.stop();
hbi.close();
}
}
@AfterClass @AfterClass
public static void tearDownAfterClass() throws Exception { public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster(); util.shutdownMiniCluster();