From 9422d9b50d90a99062880cf648dd86a764bf97ec Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Wed, 5 Aug 2015 16:28:57 -0700 Subject: [PATCH] YARN-3984. Adjusted the event column key schema and avoided missing empty event. Contributed by Vrushali C. (cherry picked from commit 895ccfa1ab9e701f2908586e323249f670fe5544) --- .../storage/HBaseTimelineWriterImpl.java | 23 +++- .../storage/common/TimelineWriterUtils.java | 13 +++ .../storage/entity/EntityRowKey.java | 18 +-- .../storage/TestHBaseTimelineWriterImpl.java | 105 +++++++++++++++--- 4 files changed, 125 insertions(+), 34 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index cd2e76e8f3f..3173e87e563 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; @@ -200,20 +201,32 @@ public class HBaseTimelineWriterImpl extends AbstractService implements "! Using the current timestamp"); eventTimestamp = System.currentTimeMillis(); } + byte[] columnQualifierFirst = + Bytes.toBytes(Separator.VALUES.encode(eventId)); + byte[] columnQualifierWithTsBytes = + Separator.VALUES.join(columnQualifierFirst, + Bytes.toBytes(TimelineWriterUtils.invert(eventTimestamp))); Map eventInfo = event.getInfo(); - if (eventInfo != null) { + if ((eventInfo == null) || (eventInfo.size() == 0)) { + // add separator since event key is empty + byte[] compoundColumnQualifierBytes = + Separator.VALUES.join(columnQualifierWithTsBytes, + null); + String compoundColumnQualifier = + Bytes.toString(compoundColumnQualifierBytes); + EntityColumnPrefix.EVENT.store(rowKey, entityTable, + compoundColumnQualifier, null, TimelineWriterUtils.EMPTY_BYTES); + } else { for (Map.Entry info : eventInfo.entrySet()) { // eventId?infoKey - byte[] columnQualifierFirst = - Bytes.toBytes(Separator.VALUES.encode(eventId)); byte[] compoundColumnQualifierBytes = - Separator.VALUES.join(columnQualifierFirst, + Separator.VALUES.join(columnQualifierWithTsBytes, Bytes.toBytes(info.getKey())); // convert back to string to avoid additional API on store. String compoundColumnQualifier = Bytes.toString(compoundColumnQualifierBytes); EntityColumnPrefix.EVENT.store(rowKey, entityTable, - compoundColumnQualifier, eventTimestamp, info.getValue()); + compoundColumnQualifier, null, info.getValue()); } // for info: eventInfo } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java index 28a0b6a0b0d..c957bf507fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java @@ -124,4 +124,17 @@ public class TimelineWriterUtils { return segments; } + /** + * Converts a timestamp into it's inverse timestamp to be used in (row) keys + * where we want to have the most recent timestamp in the top of the table + * (scans start at the most recent timestamp first). + * + * @param key value to be inverted so that the latest version will be first in + * a scan. + * @return inverted long + */ + public static long invert(Long key) { + return Long.MAX_VALUE - key; + } + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index 61958c2b21f..3e17ad0a0d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.entity; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; /** * Represents a rowkey for the entity table. @@ -47,7 +48,7 @@ public class EntityRowKey { flowId)); // Note that flowRunId is a long, so we can't encode them all at the same // time. - byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId)); + byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); byte[] third = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId)); return Separator.QUALIFIERS.join(first, second, third); } @@ -70,24 +71,11 @@ public class EntityRowKey { flowId)); // Note that flowRunId is a long, so we can't encode them all at the same // time. - byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId)); + byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); byte[] third = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(), te.getId())); return Separator.QUALIFIERS.join(first, second, third); } - /** - * Converts a timestamp into it's inverse timestamp to be used in (row) keys - * where we want to have the most recent timestamp in the top of the table - * (scans start at the most recent timestamp first). - * - * @param key value to be inverted so that the latest version will be first in - * a scan. - * @return inverted long - */ - public static long invert(Long key) { - return Long.MAX_VALUE - key; - } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java index 31cb5d2e7b7..fd5643dd354 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java @@ -43,8 +43,8 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; -import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; @@ -274,7 +274,7 @@ public class TestHBaseTimelineWriterImpl { assertEquals(user, Bytes.toString(rowKeyComponents[0])); assertEquals(cluster, Bytes.toString(rowKeyComponents[1])); assertEquals(flow, Bytes.toString(rowKeyComponents[2])); - assertEquals(EntityRowKey.invert(runid), Bytes.toLong(rowKeyComponents[3])); + assertEquals(TimelineWriterUtils.invert(runid), Bytes.toLong(rowKeyComponents[3])); assertEquals(appName, Bytes.toString(rowKeyComponents[4])); assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5])); assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6])); @@ -317,7 +317,6 @@ public class TestHBaseTimelineWriterImpl { byte[] startRow = EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName); s.setStartRow(startRow); - s.setMaxVersions(Integer.MAX_VALUE); Connection conn = ConnectionFactory.createConnection(c1); ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s); @@ -331,24 +330,23 @@ public class TestHBaseTimelineWriterImpl { assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, entity)); - // check the events - NavigableMap> eventsResult = - EntityColumnPrefix.EVENT.readResultsWithTimestamps(result); + Map eventsResult = + EntityColumnPrefix.EVENT.readResults(result); // there should be only one event assertEquals(1, eventsResult.size()); // key name for the event - String valueKey = eventId + Separator.VALUES.getValue() + expKey; - for (Map.Entry> e : + byte[] compoundColumnQualifierBytes = + Separator.VALUES.join(Bytes.toBytes(eventId), + Bytes.toBytes(TimelineWriterUtils.invert(expTs)), + Bytes.toBytes(expKey)); + String valueKey = Bytes.toString(compoundColumnQualifierBytes); + for (Map.Entry e : eventsResult.entrySet()) { // the value key must match assertEquals(valueKey, e.getKey()); - NavigableMap value = e.getValue(); + Object value = e.getValue(); // there should be only one timestamp and value - assertEquals(1, value.size()); - for (Map.Entry e2: value.entrySet()) { - assertEquals(expTs, e2.getKey()); - assertEquals(expVal, e2.getValue()); - } + assertEquals(expVal, value.toString()); } } } @@ -360,6 +358,85 @@ public class TestHBaseTimelineWriterImpl { } } + @Test + public void testAdditionalEntityEmptyEventInfo() throws IOException { + TimelineEvent event = new TimelineEvent(); + String eventId = "foo_event_id"; + event.setId(eventId); + Long expTs = 1436512802000L; + event.setTimestamp(expTs); + + final TimelineEntity entity = new TimelineEntity(); + entity.setId("attempt_1329348432655_0001_m_000008_18"); + entity.setType("FOO_ATTEMPT"); + entity.addEvent(event); + + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entity); + + HBaseTimelineWriterImpl hbi = null; + try { + Configuration c1 = util.getConfiguration(); + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String cluster = "cluster_emptyeventkey"; + String user = "user_emptyeventkey"; + String flow = "other_flow_name"; + String flowVersion = "1111F01C2287BA"; + long runid = 1009876543218L; + String appName = "some app name"; + byte[] startRow = + EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName); + hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); + hbi.stop(); + // scan the table and see that entity exists + Scan s = new Scan(); + s.setStartRow(startRow); + s.addFamily(EntityColumnFamily.INFO.getBytes()); + Connection conn = ConnectionFactory.createConnection(c1); + ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s); + + int rowCount = 0; + for (Result result : scanner) { + if (result != null && !result.isEmpty()) { + rowCount++; + + // check the row key + byte[] row1 = result.getRow(); + assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, + entity)); + + Map eventsResult = + EntityColumnPrefix.EVENT.readResults(result); + // there should be only one event + assertEquals(1, eventsResult.size()); + // key name for the event + byte[] compoundColumnQualifierWithTsBytes = + Separator.VALUES.join(Bytes.toBytes(eventId), + Bytes.toBytes(TimelineWriterUtils.invert(expTs))); + byte[] compoundColumnQualifierBytes = + Separator.VALUES.join(compoundColumnQualifierWithTsBytes, + null); + String valueKey = Bytes.toString(compoundColumnQualifierBytes); + for (Map.Entry e : + eventsResult.entrySet()) { + // the column qualifier key must match + assertEquals(valueKey, e.getKey()); + Object value = e.getValue(); + // value should be empty + assertEquals("", value.toString()); + } + } + } + assertEquals(1, rowCount); + + } finally { + hbi.stop(); + hbi.close(); + } + } + + @AfterClass public static void tearDownAfterClass() throws Exception { util.shutdownMiniCluster();