YARN-5715. Introduce entity prefix for return and sort order. Contributed by Rohith Sharma K S.
This commit is contained in:
parent
b880238c0a
commit
d0145b79fe
|
@ -54,6 +54,7 @@ import org.codehaus.jackson.annotate.JsonSetter;
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public class TimelineEntity implements Comparable<TimelineEntity> {
|
public class TimelineEntity implements Comparable<TimelineEntity> {
|
||||||
protected final static String SYSTEM_INFO_KEY_PREFIX = "SYSTEM_INFO_";
|
protected final static String SYSTEM_INFO_KEY_PREFIX = "SYSTEM_INFO_";
|
||||||
|
public final static long DEFAULT_ENTITY_PREFIX = 0L;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Identifier of timeline entity(entity id + entity type).
|
* Identifier of timeline entity(entity id + entity type).
|
||||||
|
@ -145,6 +146,7 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
|
||||||
private HashMap<String, Set<String>> isRelatedToEntities = new HashMap<>();
|
private HashMap<String, Set<String>> isRelatedToEntities = new HashMap<>();
|
||||||
private HashMap<String, Set<String>> relatesToEntities = new HashMap<>();
|
private HashMap<String, Set<String>> relatesToEntities = new HashMap<>();
|
||||||
private Long createdTime;
|
private Long createdTime;
|
||||||
|
private long idPrefix;
|
||||||
|
|
||||||
public TimelineEntity() {
|
public TimelineEntity() {
|
||||||
identifier = new Identifier();
|
identifier = new Identifier();
|
||||||
|
@ -581,4 +583,38 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
|
||||||
return real.toString();
|
return real.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@XmlElement(name = "idprefix")
|
||||||
|
public long getIdPrefix() {
|
||||||
|
if (real == null) {
|
||||||
|
return idPrefix;
|
||||||
|
} else {
|
||||||
|
return real.getIdPrefix();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets idPrefix for an entity.
|
||||||
|
* <p>
|
||||||
|
* <b>Note</b>: Entities will be stored in the order of idPrefix specified.
|
||||||
|
* If users decide to set idPrefix for an entity, they <b>MUST</b> provide
|
||||||
|
* the same prefix for every update of this entity.
|
||||||
|
* </p>
|
||||||
|
* Example: <blockquote><pre>
|
||||||
|
* TimelineEntity entity = new TimelineEntity();
|
||||||
|
* entity.setIdPrefix(value);
|
||||||
|
* </pre></blockquote>
|
||||||
|
* Users can use {@link TimelineServiceHelper#invertLong(long)} to invert
|
||||||
|
* the prefix if necessary.
|
||||||
|
*
|
||||||
|
* @param entityIdPrefix prefix for an entity.
|
||||||
|
*/
|
||||||
|
@JsonSetter("idprefix")
|
||||||
|
public void setIdPrefix(long entityIdPrefix) {
|
||||||
|
if (real == null) {
|
||||||
|
this.idPrefix = entityIdPrefix;
|
||||||
|
} else {
|
||||||
|
real.setIdPrefix(entityIdPrefix);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -46,4 +46,12 @@ public final class TimelineServiceHelper {
|
||||||
(HashMap<E, V>) originalMap : new HashMap<E, V>(originalMap);
|
(HashMap<E, V>) originalMap : new HashMap<E, V>(originalMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inverts the given key.
|
||||||
|
* @param key value to be inverted .
|
||||||
|
* @return inverted long
|
||||||
|
*/
|
||||||
|
public static long invertLong(long key) {
|
||||||
|
return Long.MAX_VALUE - key;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,7 +159,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
} else {
|
} else {
|
||||||
EntityRowKey entityRowKey =
|
EntityRowKey entityRowKey =
|
||||||
new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
|
new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
|
||||||
te.getType(), te.getId());
|
te.getType(), te.getIdPrefix(), te.getId());
|
||||||
rowKey = entityRowKey.getRowKey();
|
rowKey = entityRowKey.getRowKey();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,18 +33,21 @@ public class EntityRowKey {
|
||||||
private final Long flowRunId;
|
private final Long flowRunId;
|
||||||
private final String appId;
|
private final String appId;
|
||||||
private final String entityType;
|
private final String entityType;
|
||||||
|
private final long entityIdPrefix;
|
||||||
private final String entityId;
|
private final String entityId;
|
||||||
private final KeyConverter<EntityRowKey> entityRowKeyConverter =
|
private final KeyConverter<EntityRowKey> entityRowKeyConverter =
|
||||||
new EntityRowKeyConverter();
|
new EntityRowKeyConverter();
|
||||||
|
|
||||||
public EntityRowKey(String clusterId, String userId, String flowName,
|
public EntityRowKey(String clusterId, String userId, String flowName,
|
||||||
Long flowRunId, String appId, String entityType, String entityId) {
|
Long flowRunId, String appId, String entityType, long entityIdPrefix,
|
||||||
|
String entityId) {
|
||||||
this.clusterId = clusterId;
|
this.clusterId = clusterId;
|
||||||
this.userId = userId;
|
this.userId = userId;
|
||||||
this.flowName = flowName;
|
this.flowName = flowName;
|
||||||
this.flowRunId = flowRunId;
|
this.flowRunId = flowRunId;
|
||||||
this.appId = appId;
|
this.appId = appId;
|
||||||
this.entityType = entityType;
|
this.entityType = entityType;
|
||||||
|
this.entityIdPrefix = entityIdPrefix;
|
||||||
this.entityId = entityId;
|
this.entityId = entityId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,6 +79,10 @@ public class EntityRowKey {
|
||||||
return entityId;
|
return entityId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getEntityIdPrefix() {
|
||||||
|
return entityIdPrefix;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a row key for the entity table as follows:
|
* Constructs a row key for the entity table as follows:
|
||||||
* {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}.
|
* {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}.
|
||||||
|
@ -126,7 +133,7 @@ public class EntityRowKey {
|
||||||
private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
|
private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
|
||||||
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
|
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
|
||||||
AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
|
AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
|
||||||
Separator.VARIABLE_SIZE };
|
Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE };
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* (non-Javadoc)
|
* (non-Javadoc)
|
||||||
|
@ -172,11 +179,15 @@ public class EntityRowKey {
|
||||||
byte[] entityType =
|
byte[] entityType =
|
||||||
Separator.encode(rowKey.getEntityType(), Separator.SPACE,
|
Separator.encode(rowKey.getEntityType(), Separator.SPACE,
|
||||||
Separator.TAB, Separator.QUALIFIERS);
|
Separator.TAB, Separator.QUALIFIERS);
|
||||||
|
|
||||||
|
byte[] enitityIdPrefix = Bytes.toBytes(rowKey.getEntityIdPrefix());
|
||||||
|
|
||||||
byte[] entityId =
|
byte[] entityId =
|
||||||
rowKey.getEntityId() == null ? Separator.EMPTY_BYTES : Separator
|
rowKey.getEntityId() == null ? Separator.EMPTY_BYTES : Separator
|
||||||
.encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB,
|
.encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB,
|
||||||
Separator.QUALIFIERS);
|
Separator.QUALIFIERS);
|
||||||
byte[] fourth = Separator.QUALIFIERS.join(entityType, entityId);
|
byte[] fourth =
|
||||||
|
Separator.QUALIFIERS.join(entityType, enitityIdPrefix, entityId);
|
||||||
return Separator.QUALIFIERS.join(first, second, third, fourth);
|
return Separator.QUALIFIERS.join(first, second, third, fourth);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -196,7 +207,7 @@ public class EntityRowKey {
|
||||||
public EntityRowKey decode(byte[] rowKey) {
|
public EntityRowKey decode(byte[] rowKey) {
|
||||||
byte[][] rowKeyComponents =
|
byte[][] rowKeyComponents =
|
||||||
Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
|
Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
|
||||||
if (rowKeyComponents.length != 7) {
|
if (rowKeyComponents.length != 8) {
|
||||||
throw new IllegalArgumentException("the row key is not valid for "
|
throw new IllegalArgumentException("the row key is not valid for "
|
||||||
+ "an entity");
|
+ "an entity");
|
||||||
}
|
}
|
||||||
|
@ -215,11 +226,14 @@ public class EntityRowKey {
|
||||||
String entityType =
|
String entityType =
|
||||||
Separator.decode(Bytes.toString(rowKeyComponents[5]),
|
Separator.decode(Bytes.toString(rowKeyComponents[5]),
|
||||||
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
|
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
|
||||||
|
|
||||||
|
long entityPrefixId = Bytes.toLong(rowKeyComponents[6]);
|
||||||
|
|
||||||
String entityId =
|
String entityId =
|
||||||
Separator.decode(Bytes.toString(rowKeyComponents[6]),
|
Separator.decode(Bytes.toString(rowKeyComponents[7]),
|
||||||
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
|
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
|
||||||
return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
|
return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
|
||||||
entityType, entityId);
|
entityType, entityPrefixId, entityId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,8 @@ public class EntityRowKeyPrefix extends EntityRowKey implements
|
||||||
*/
|
*/
|
||||||
public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
|
public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
|
||||||
Long flowRunId, String appId, String entityType) {
|
Long flowRunId, String appId, String entityType) {
|
||||||
super(clusterId, userId, flowName, flowRunId, appId, entityType, null);
|
// TODO YARN-5585, change prefix id from 0L
|
||||||
|
super(clusterId, userId, flowName, flowRunId, appId, entityType, 0L, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -57,7 +58,8 @@ public class EntityRowKeyPrefix extends EntityRowKey implements
|
||||||
*/
|
*/
|
||||||
public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
|
public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
|
||||||
Long flowRunId, String appId) {
|
Long flowRunId, String appId) {
|
||||||
super(clusterId, userId, flowName, flowRunId, appId, null, null);
|
// TODO YARN-5585, change prefix id from 0L
|
||||||
|
super(clusterId, userId, flowName, flowRunId, appId, null, 0L, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -50,8 +50,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBas
|
||||||
* | flowRunId! | | | configKey2: |
|
* | flowRunId! | | | configKey2: |
|
||||||
* | AppId! | created_time: | metricId1: | configValue2 |
|
* | AppId! | created_time: | metricId1: | configValue2 |
|
||||||
* | entityType!| 1392993084018 | metricValue2 | |
|
* | entityType!| 1392993084018 | metricValue2 | |
|
||||||
* | entityId | | @timestamp2 | |
|
* | idPrefix! | | @timestamp2 | |
|
||||||
* | | i!infoKey: | | |
|
* | entityId | i!infoKey: | | |
|
||||||
* | | infoValue | metricId1: | |
|
* | | infoValue | metricId1: | |
|
||||||
* | | | metricValue1 | |
|
* | | | metricValue1 | |
|
||||||
* | | r!relatesToKey: | @timestamp2 | |
|
* | | r!relatesToKey: | @timestamp2 | |
|
||||||
|
|
|
@ -502,7 +502,9 @@ class GenericEntityReader extends TimelineEntityReader {
|
||||||
byte[] rowKey =
|
byte[] rowKey =
|
||||||
new EntityRowKey(context.getClusterId(), context.getUserId(),
|
new EntityRowKey(context.getClusterId(), context.getUserId(),
|
||||||
context.getFlowName(), context.getFlowRunId(), context.getAppId(),
|
context.getFlowName(), context.getFlowRunId(), context.getAppId(),
|
||||||
context.getEntityType(), context.getEntityId()).getRowKey();
|
// TODO YARN-5585, change prefix id from 0L
|
||||||
|
context.getEntityType(), 0L, context.getEntityId()).getRowKey();
|
||||||
|
|
||||||
Get get = new Get(rowKey);
|
Get get = new Get(rowKey);
|
||||||
get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
|
get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
|
||||||
if (filterList != null && !filterList.getFilters().isEmpty()) {
|
if (filterList != null && !filterList.getFilters().isEmpty()) {
|
||||||
|
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
||||||
|
@ -135,34 +136,37 @@ public class TestRowKeys {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEntityRowKey() {
|
public void testEntityRowKey() {
|
||||||
String entityId = "!ent!ity!!id!";
|
TimelineEntity entity = new TimelineEntity();
|
||||||
String entityType = "entity!Type";
|
entity.setId("!ent!ity!!id!");
|
||||||
|
entity.setType("entity!Type");
|
||||||
|
|
||||||
byte[] byteRowKey =
|
byte[] byteRowKey =
|
||||||
new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
|
new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
|
||||||
entityType, entityId).getRowKey();
|
entity.getType(), entity.getIdPrefix(),
|
||||||
|
entity.getId()).getRowKey();
|
||||||
EntityRowKey rowKey = EntityRowKey.parseRowKey(byteRowKey);
|
EntityRowKey rowKey = EntityRowKey.parseRowKey(byteRowKey);
|
||||||
assertEquals(CLUSTER, rowKey.getClusterId());
|
assertEquals(CLUSTER, rowKey.getClusterId());
|
||||||
assertEquals(USER, rowKey.getUserId());
|
assertEquals(USER, rowKey.getUserId());
|
||||||
assertEquals(FLOW_NAME, rowKey.getFlowName());
|
assertEquals(FLOW_NAME, rowKey.getFlowName());
|
||||||
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
|
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
|
||||||
assertEquals(APPLICATION_ID, rowKey.getAppId());
|
assertEquals(APPLICATION_ID, rowKey.getAppId());
|
||||||
assertEquals(entityType, rowKey.getEntityType());
|
assertEquals(entity.getType(), rowKey.getEntityType());
|
||||||
assertEquals(entityId, rowKey.getEntityId());
|
assertEquals(entity.getId(), rowKey.getEntityId());
|
||||||
|
|
||||||
byte[] byteRowKeyPrefix =
|
byte[] byteRowKeyPrefix =
|
||||||
new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
|
new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
|
||||||
APPLICATION_ID, entityType).getRowKeyPrefix();
|
APPLICATION_ID, entity.getType()).getRowKeyPrefix();
|
||||||
byte[][] splits =
|
byte[][] splits =
|
||||||
Separator.QUALIFIERS.split(
|
Separator.QUALIFIERS.split(
|
||||||
byteRowKeyPrefix,
|
byteRowKeyPrefix,
|
||||||
new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
|
new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
|
||||||
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
|
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
|
||||||
AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
|
AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
|
||||||
Separator.VARIABLE_SIZE});
|
Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE });
|
||||||
assertEquals(7, splits.length);
|
assertEquals(8, splits.length);
|
||||||
assertEquals(0, splits[6].length);
|
assertEquals(entity.getIdPrefix(), splits[7].length);
|
||||||
assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4]));
|
assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4]));
|
||||||
assertEquals(entityType,
|
assertEquals(entity.getType(),
|
||||||
Separator.QUALIFIERS.decode(Bytes.toString(splits[5])));
|
Separator.QUALIFIERS.decode(Bytes.toString(splits[5])));
|
||||||
verifyRowPrefixBytes(byteRowKeyPrefix);
|
verifyRowPrefixBytes(byteRowKeyPrefix);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue