YARN-5715. Introduce entity prefix for return and sort order. Contributed by Rohith Sharma K S.

(cherry picked from commit f37288c7e0127b564645e978c7aab2a186fa6be6)
This commit is contained in:
Sangjin Lee 2016-10-27 13:06:18 -07:00 committed by Varun Saxena
parent d19726ec37
commit ebf2f90528
8 changed files with 88 additions and 22 deletions

View File

@ -54,6 +54,7 @@ import org.codehaus.jackson.annotate.JsonSetter;
@InterfaceStability.Unstable
public class TimelineEntity implements Comparable<TimelineEntity> {
protected final static String SYSTEM_INFO_KEY_PREFIX = "SYSTEM_INFO_";
public final static long DEFAULT_ENTITY_PREFIX = 0L;
/**
* Identifier of timeline entity(entity id + entity type).
@ -145,6 +146,7 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
private HashMap<String, Set<String>> isRelatedToEntities = new HashMap<>();
private HashMap<String, Set<String>> relatesToEntities = new HashMap<>();
private Long createdTime;
private long idPrefix;
public TimelineEntity() {
identifier = new Identifier();
@ -581,4 +583,38 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
return real.toString();
}
}
@XmlElement(name = "idprefix")
public long getIdPrefix() {
if (real == null) {
return idPrefix;
} else {
return real.getIdPrefix();
}
}
/**
* Sets idPrefix for an entity.
* <p>
* <b>Note</b>: Entities will be stored in the order of idPrefix specified.
* If users decide to set idPrefix for an entity, they <b>MUST</b> provide
* the same prefix for every update of this entity.
* </p>
* Example: <blockquote><pre>
* TimelineEntity entity = new TimelineEntity();
* entity.setIdPrefix(value);
* </pre></blockquote>
* Users can use {@link TimelineServiceHelper#invertLong(long)} to invert
* the prefix if necessary.
*
* @param entityIdPrefix prefix for an entity.
*/
@JsonSetter("idprefix")
public void setIdPrefix(long entityIdPrefix) {
if (real == null) {
this.idPrefix = entityIdPrefix;
} else {
real.setIdPrefix(entityIdPrefix);
}
}
}

View File

@ -46,4 +46,12 @@ public final class TimelineServiceHelper {
(HashMap<E, V>) originalMap : new HashMap<E, V>(originalMap);
}
/**
* Inverts the given key.
* @param key value to be inverted .
* @return inverted long
*/
public static long invertLong(long key) {
return Long.MAX_VALUE - key;
}
}

View File

@ -159,7 +159,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
} else {
EntityRowKey entityRowKey =
new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
te.getType(), te.getId());
te.getType(), te.getIdPrefix(), te.getId());
rowKey = entityRowKey.getRowKey();
}

View File

@ -33,18 +33,21 @@ public class EntityRowKey {
private final Long flowRunId;
private final String appId;
private final String entityType;
private final long entityIdPrefix;
private final String entityId;
private final KeyConverter<EntityRowKey> entityRowKeyConverter =
new EntityRowKeyConverter();
public EntityRowKey(String clusterId, String userId, String flowName,
Long flowRunId, String appId, String entityType, String entityId) {
Long flowRunId, String appId, String entityType, long entityIdPrefix,
String entityId) {
this.clusterId = clusterId;
this.userId = userId;
this.flowName = flowName;
this.flowRunId = flowRunId;
this.appId = appId;
this.entityType = entityType;
this.entityIdPrefix = entityIdPrefix;
this.entityId = entityId;
}
@ -76,6 +79,10 @@ public class EntityRowKey {
return entityId;
}
public long getEntityIdPrefix() {
return entityIdPrefix;
}
/**
* Constructs a row key for the entity table as follows:
* {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}.
@ -126,7 +133,7 @@ public class EntityRowKey {
private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE };
Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE };
/*
* (non-Javadoc)
@ -172,11 +179,15 @@ public class EntityRowKey {
byte[] entityType =
Separator.encode(rowKey.getEntityType(), Separator.SPACE,
Separator.TAB, Separator.QUALIFIERS);
byte[] enitityIdPrefix = Bytes.toBytes(rowKey.getEntityIdPrefix());
byte[] entityId =
rowKey.getEntityId() == null ? Separator.EMPTY_BYTES : Separator
.encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB,
Separator.QUALIFIERS);
byte[] fourth = Separator.QUALIFIERS.join(entityType, entityId);
byte[] fourth =
Separator.QUALIFIERS.join(entityType, enitityIdPrefix, entityId);
return Separator.QUALIFIERS.join(first, second, third, fourth);
}
@ -196,7 +207,7 @@ public class EntityRowKey {
public EntityRowKey decode(byte[] rowKey) {
byte[][] rowKeyComponents =
Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
if (rowKeyComponents.length != 7) {
if (rowKeyComponents.length != 8) {
throw new IllegalArgumentException("the row key is not valid for "
+ "an entity");
}
@ -215,11 +226,14 @@ public class EntityRowKey {
String entityType =
Separator.decode(Bytes.toString(rowKeyComponents[5]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
long entityPrefixId = Bytes.toLong(rowKeyComponents[6]);
String entityId =
Separator.decode(Bytes.toString(rowKeyComponents[6]),
Separator.decode(Bytes.toString(rowKeyComponents[7]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
entityType, entityId);
entityType, entityPrefixId, entityId);
}
}
}

View File

@ -41,7 +41,8 @@ public class EntityRowKeyPrefix extends EntityRowKey implements
*/
public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
Long flowRunId, String appId, String entityType) {
super(clusterId, userId, flowName, flowRunId, appId, entityType, null);
// TODO YARN-5585, change prefix id from 0L
super(clusterId, userId, flowName, flowRunId, appId, entityType, 0L, null);
}
/**
@ -57,7 +58,8 @@ public class EntityRowKeyPrefix extends EntityRowKey implements
*/
public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
Long flowRunId, String appId) {
super(clusterId, userId, flowName, flowRunId, appId, null, null);
// TODO YARN-5585, change prefix id from 0L
super(clusterId, userId, flowName, flowRunId, appId, null, 0L, null);
}
/*

View File

@ -50,8 +50,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBas
* | flowRunId! | | | configKey2: |
* | AppId! | created_time: | metricId1: | configValue2 |
* | entityType!| 1392993084018 | metricValue2 | |
* | entityId | | @timestamp2 | |
* | | i!infoKey: | | |
* | idPrefix! | | @timestamp2 | |
* | entityId | i!infoKey: | | |
* | | infoValue | metricId1: | |
* | | | metricValue1 | |
* | | r!relatesToKey: | @timestamp2 | |

View File

@ -502,7 +502,9 @@ class GenericEntityReader extends TimelineEntityReader {
byte[] rowKey =
new EntityRowKey(context.getClusterId(), context.getUserId(),
context.getFlowName(), context.getFlowRunId(), context.getAppId(),
context.getEntityType(), context.getEntityId()).getRowKey();
// TODO YARN-5585, change prefix id from 0L
context.getEntityType(), 0L, context.getEntityId()).getRowKey();
Get get = new Get(rowKey);
get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
if (filterList != null && !filterList.getFilters().isEmpty()) {

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
@ -135,34 +136,37 @@ public class TestRowKeys {
@Test
public void testEntityRowKey() {
String entityId = "!ent!ity!!id!";
String entityType = "entity!Type";
TimelineEntity entity = new TimelineEntity();
entity.setId("!ent!ity!!id!");
entity.setType("entity!Type");
byte[] byteRowKey =
new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
entityType, entityId).getRowKey();
entity.getType(), entity.getIdPrefix(),
entity.getId()).getRowKey();
EntityRowKey rowKey = EntityRowKey.parseRowKey(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
assertEquals(APPLICATION_ID, rowKey.getAppId());
assertEquals(entityType, rowKey.getEntityType());
assertEquals(entityId, rowKey.getEntityId());
assertEquals(entity.getType(), rowKey.getEntityType());
assertEquals(entity.getId(), rowKey.getEntityId());
byte[] byteRowKeyPrefix =
new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
APPLICATION_ID, entityType).getRowKeyPrefix();
APPLICATION_ID, entity.getType()).getRowKeyPrefix();
byte[][] splits =
Separator.QUALIFIERS.split(
byteRowKeyPrefix,
new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE});
assertEquals(7, splits.length);
assertEquals(0, splits[6].length);
Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE });
assertEquals(8, splits.length);
assertEquals(entity.getIdPrefix(), splits[7].length);
assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4]));
assertEquals(entityType,
assertEquals(entity.getType(),
Separator.QUALIFIERS.decode(Bytes.toString(splits[5])));
verifyRowPrefixBytes(byteRowKeyPrefix);