From ebf2f90528f641caf570a2741a7bc43868cef893 Mon Sep 17 00:00:00 2001 From: Sangjin Lee Date: Thu, 27 Oct 2016 13:06:18 -0700 Subject: [PATCH] YARN-5715. Introduce entity prefix for return and sort order. Contributed by Rohith Sharma K S. (cherry picked from commit f37288c7e0127b564645e978c7aab2a186fa6be6) --- .../timelineservice/TimelineEntity.java | 36 +++++++++++++++++++ .../yarn/util/TimelineServiceHelper.java | 8 +++++ .../storage/HBaseTimelineWriterImpl.java | 2 +- .../storage/entity/EntityRowKey.java | 26 ++++++++++---- .../storage/entity/EntityRowKeyPrefix.java | 6 ++-- .../storage/entity/EntityTable.java | 4 +-- .../storage/reader/GenericEntityReader.java | 4 ++- .../storage/common/TestRowKeys.java | 24 +++++++------ 8 files changed, 88 insertions(+), 22 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java index 9c0a983ef99..7a289b96d37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java @@ -54,6 +54,7 @@ import org.codehaus.jackson.annotate.JsonSetter; @InterfaceStability.Unstable public class TimelineEntity implements Comparable { protected final static String SYSTEM_INFO_KEY_PREFIX = "SYSTEM_INFO_"; + public final static long DEFAULT_ENTITY_PREFIX = 0L; /** * Identifier of timeline entity(entity id + entity type). @@ -145,6 +146,7 @@ public class TimelineEntity implements Comparable { private HashMap> isRelatedToEntities = new HashMap<>(); private HashMap> relatesToEntities = new HashMap<>(); private Long createdTime; + private long idPrefix; public TimelineEntity() { identifier = new Identifier(); @@ -581,4 +583,38 @@ public class TimelineEntity implements Comparable { return real.toString(); } } + + @XmlElement(name = "idprefix") + public long getIdPrefix() { + if (real == null) { + return idPrefix; + } else { + return real.getIdPrefix(); + } + } + + /** + * Sets idPrefix for an entity. + *

+ * Note: Entities will be stored in the order of idPrefix specified. + * If users decide to set idPrefix for an entity, they MUST provide + * the same prefix for every update of this entity. + *

+ * Example:
+   * TimelineEntity entity = new TimelineEntity();
+   * entity.setIdPrefix(value);
+   * 
+ * Users can use {@link TimelineServiceHelper#invertLong(long)} to invert + * the prefix if necessary. + * + * @param entityIdPrefix prefix for an entity. + */ + @JsonSetter("idprefix") + public void setIdPrefix(long entityIdPrefix) { + if (real == null) { + this.idPrefix = entityIdPrefix; + } else { + real.setIdPrefix(entityIdPrefix); + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/TimelineServiceHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/TimelineServiceHelper.java index e0268a67b8f..65ed18a7a1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/TimelineServiceHelper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/TimelineServiceHelper.java @@ -46,4 +46,12 @@ public final class TimelineServiceHelper { (HashMap) originalMap : new HashMap(originalMap); } + /** + * Inverts the given key. + * @param key value to be inverted . + * @return inverted long + */ + public static long invertLong(long key) { + return Long.MAX_VALUE - key; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index c0d1fbacab9..a8b44f1163c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -159,7 +159,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements } else { EntityRowKey entityRowKey = new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, - te.getType(), te.getId()); + te.getType(), te.getIdPrefix(), te.getId()); rowKey = entityRowKey.getRowKey(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index ff22178fd5c..10aeec446a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -33,18 +33,21 @@ public class EntityRowKey { private final Long flowRunId; private final String appId; private final String entityType; + private final long entityIdPrefix; private final String entityId; private final KeyConverter entityRowKeyConverter = new EntityRowKeyConverter(); public EntityRowKey(String clusterId, String userId, String flowName, - Long flowRunId, String appId, String entityType, String entityId) { + Long flowRunId, String appId, String entityType, long entityIdPrefix, + String entityId) { this.clusterId = clusterId; this.userId = userId; this.flowName = flowName; this.flowRunId = flowRunId; this.appId = appId; this.entityType = entityType; + this.entityIdPrefix = entityIdPrefix; this.entityId = entityId; } @@ -76,6 +79,10 @@ public class EntityRowKey { return entityId; } + public long getEntityIdPrefix() { + return entityIdPrefix; + } + /** * Constructs a row key for the entity table as follows: * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}. @@ -126,7 +133,7 @@ public class EntityRowKey { private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE }; + Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE }; /* * (non-Javadoc) @@ -172,11 +179,15 @@ public class EntityRowKey { byte[] entityType = Separator.encode(rowKey.getEntityType(), Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); + + byte[] enitityIdPrefix = Bytes.toBytes(rowKey.getEntityIdPrefix()); + byte[] entityId = rowKey.getEntityId() == null ? Separator.EMPTY_BYTES : Separator .encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); - byte[] fourth = Separator.QUALIFIERS.join(entityType, entityId); + byte[] fourth = + Separator.QUALIFIERS.join(entityType, enitityIdPrefix, entityId); return Separator.QUALIFIERS.join(first, second, third, fourth); } @@ -196,7 +207,7 @@ public class EntityRowKey { public EntityRowKey decode(byte[] rowKey) { byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); - if (rowKeyComponents.length != 7) { + if (rowKeyComponents.length != 8) { throw new IllegalArgumentException("the row key is not valid for " + "an entity"); } @@ -215,11 +226,14 @@ public class EntityRowKey { String entityType = Separator.decode(Bytes.toString(rowKeyComponents[5]), Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + + long entityPrefixId = Bytes.toLong(rowKeyComponents[6]); + String entityId = - Separator.decode(Bytes.toString(rowKeyComponents[6]), + Separator.decode(Bytes.toString(rowKeyComponents[7]), Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, - entityType, entityId); + entityType, entityPrefixId, entityId); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java index 91461800ef3..ef717c08937 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java @@ -41,7 +41,8 @@ public class EntityRowKeyPrefix extends EntityRowKey implements */ public EntityRowKeyPrefix(String clusterId, String userId, String flowName, Long flowRunId, String appId, String entityType) { - super(clusterId, userId, flowName, flowRunId, appId, entityType, null); + // TODO YARN-5585, change prefix id from 0L + super(clusterId, userId, flowName, flowRunId, appId, entityType, 0L, null); } /** @@ -57,7 +58,8 @@ public class EntityRowKeyPrefix extends EntityRowKey implements */ public EntityRowKeyPrefix(String clusterId, String userId, String flowName, Long flowRunId, String appId) { - super(clusterId, userId, flowName, flowRunId, appId, null, null); + // TODO YARN-5585, change prefix id from 0L + super(clusterId, userId, flowName, flowRunId, appId, null, 0L, null); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java index b194f07fef4..027c8d5c638 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java @@ -50,8 +50,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBas * | flowRunId! | | | configKey2: | * | AppId! | created_time: | metricId1: | configValue2 | * | entityType!| 1392993084018 | metricValue2 | | - * | entityId | | @timestamp2 | | - * | | i!infoKey: | | | + * | idPrefix! | | @timestamp2 | | + * | entityId | i!infoKey: | | | * | | infoValue | metricId1: | | * | | | metricValue1 | | * | | r!relatesToKey: | @timestamp2 | | diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index 4e1ab8a378c..1e78a180732 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -502,7 +502,9 @@ class GenericEntityReader extends TimelineEntityReader { byte[] rowKey = new EntityRowKey(context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowRunId(), context.getAppId(), - context.getEntityType(), context.getEntityId()).getRowKey(); + // TODO YARN-5585, change prefix id from 0L + context.getEntityType(), 0L, context.getEntityId()).getRowKey(); + Get get = new Get(rowKey); get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); if (filterList != null && !filterList.getFilters().isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java index 368b0604d5a..c4c8dcea5c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; @@ -135,34 +136,37 @@ public class TestRowKeys { @Test public void testEntityRowKey() { - String entityId = "!ent!ity!!id!"; - String entityType = "entity!Type"; + TimelineEntity entity = new TimelineEntity(); + entity.setId("!ent!ity!!id!"); + entity.setType("entity!Type"); + byte[] byteRowKey = new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID, - entityType, entityId).getRowKey(); + entity.getType(), entity.getIdPrefix(), + entity.getId()).getRowKey(); EntityRowKey rowKey = EntityRowKey.parseRowKey(byteRowKey); assertEquals(CLUSTER, rowKey.getClusterId()); assertEquals(USER, rowKey.getUserId()); assertEquals(FLOW_NAME, rowKey.getFlowName()); assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); assertEquals(APPLICATION_ID, rowKey.getAppId()); - assertEquals(entityType, rowKey.getEntityType()); - assertEquals(entityId, rowKey.getEntityId()); + assertEquals(entity.getType(), rowKey.getEntityType()); + assertEquals(entity.getId(), rowKey.getEntityId()); byte[] byteRowKeyPrefix = new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, - APPLICATION_ID, entityType).getRowKeyPrefix(); + APPLICATION_ID, entity.getType()).getRowKeyPrefix(); byte[][] splits = Separator.QUALIFIERS.split( byteRowKeyPrefix, new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE}); - assertEquals(7, splits.length); - assertEquals(0, splits[6].length); + Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE }); + assertEquals(8, splits.length); + assertEquals(entity.getIdPrefix(), splits[7].length); assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4])); - assertEquals(entityType, + assertEquals(entity.getType(), Separator.QUALIFIERS.decode(Bytes.toString(splits[5]))); verifyRowPrefixBytes(byteRowKeyPrefix);