YARN-3906. Split the application table from the entity table. Contributed by Sangjin Lee.
(cherry picked from commit bcd755eba9466ce277d3c14192c31da6462c4ab3)
This commit is contained in:
parent
9e5155be36
commit
00e85e7a2b
|
@ -18,33 +18,6 @@
|
||||||
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||||
|
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.service.AbstractService;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -58,6 +31,40 @@ import java.util.NavigableSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
public class HBaseTimelineReaderImpl
|
public class HBaseTimelineReaderImpl
|
||||||
extends AbstractService implements TimelineReader {
|
extends AbstractService implements TimelineReader {
|
||||||
|
|
||||||
|
@ -70,6 +77,7 @@ public class HBaseTimelineReaderImpl
|
||||||
private Connection conn;
|
private Connection conn;
|
||||||
private EntityTable entityTable;
|
private EntityTable entityTable;
|
||||||
private AppToFlowTable appToFlowTable;
|
private AppToFlowTable appToFlowTable;
|
||||||
|
private ApplicationTable applicationTable;
|
||||||
|
|
||||||
public HBaseTimelineReaderImpl() {
|
public HBaseTimelineReaderImpl() {
|
||||||
super(HBaseTimelineReaderImpl.class.getName());
|
super(HBaseTimelineReaderImpl.class.getName());
|
||||||
|
@ -82,6 +90,7 @@ public class HBaseTimelineReaderImpl
|
||||||
conn = ConnectionFactory.createConnection(hbaseConf);
|
conn = ConnectionFactory.createConnection(hbaseConf);
|
||||||
entityTable = new EntityTable();
|
entityTable = new EntityTable();
|
||||||
appToFlowTable = new AppToFlowTable();
|
appToFlowTable = new AppToFlowTable();
|
||||||
|
applicationTable = new ApplicationTable();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -109,14 +118,24 @@ public class HBaseTimelineReaderImpl
|
||||||
fieldsToRetrieve = EnumSet.noneOf(Field.class);
|
fieldsToRetrieve = EnumSet.noneOf(Field.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] rowKey = EntityRowKey.getRowKey(
|
boolean isApplication = isApplicationEntity(entityType);
|
||||||
clusterId, userId, flowId, flowRunId, appId, entityType, entityId);
|
byte[] rowKey = isApplication ?
|
||||||
|
ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId,
|
||||||
|
appId) :
|
||||||
|
EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId,
|
||||||
|
entityType, entityId);
|
||||||
Get get = new Get(rowKey);
|
Get get = new Get(rowKey);
|
||||||
get.setMaxVersions(Integer.MAX_VALUE);
|
get.setMaxVersions(Integer.MAX_VALUE);
|
||||||
return parseEntity(
|
Result result = isApplication ?
|
||||||
entityTable.getResult(hbaseConf, conn, get), fieldsToRetrieve,
|
applicationTable.getResult(hbaseConf, conn, get) :
|
||||||
|
entityTable.getResult(hbaseConf, conn, get);
|
||||||
|
return parseEntity(result, fieldsToRetrieve,
|
||||||
false, DEFAULT_BEGIN_TIME, DEFAULT_END_TIME, false, DEFAULT_BEGIN_TIME,
|
false, DEFAULT_BEGIN_TIME, DEFAULT_END_TIME, false, DEFAULT_BEGIN_TIME,
|
||||||
DEFAULT_END_TIME, null, null, null, null, null, null);
|
DEFAULT_END_TIME, null, null, null, null, null, null, isApplication);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isApplicationEntity(String entityType) {
|
||||||
|
return TimelineEntityType.YARN_APPLICATION.toString().equals(entityType);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -155,26 +174,46 @@ public class HBaseTimelineReaderImpl
|
||||||
}
|
}
|
||||||
|
|
||||||
NavigableSet<TimelineEntity> entities = new TreeSet<>();
|
NavigableSet<TimelineEntity> entities = new TreeSet<>();
|
||||||
// Scan through part of the table to find the entities belong to one app and
|
boolean isApplication = isApplicationEntity(entityType);
|
||||||
// one type
|
if (isApplication) {
|
||||||
Scan scan = new Scan();
|
// If getEntities() is called for an application, there can be at most
|
||||||
scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
|
// one entity. If the entity passes the filter, it is returned. Otherwise,
|
||||||
clusterId, userId, flowId, flowRunId, appId, entityType));
|
// an empty set is returned.
|
||||||
scan.setMaxVersions(Integer.MAX_VALUE);
|
byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId,
|
||||||
ResultScanner scanner = entityTable.getResultScanner(hbaseConf, conn, scan);
|
flowRunId, appId);
|
||||||
for (Result result : scanner) {
|
Get get = new Get(rowKey);
|
||||||
|
get.setMaxVersions(Integer.MAX_VALUE);
|
||||||
|
Result result = applicationTable.getResult(hbaseConf, conn, get);
|
||||||
TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
|
TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
|
||||||
true, createdTimeBegin, createdTimeEnd,
|
true, createdTimeBegin, createdTimeEnd, true, modifiedTimeBegin,
|
||||||
true, modifiedTimeBegin, modifiedTimeEnd,
|
modifiedTimeEnd, isRelatedTo, relatesTo, infoFilters, configFilters,
|
||||||
isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters,
|
eventFilters, metricFilters, isApplication);
|
||||||
metricFilters);
|
if (entity != null) {
|
||||||
if (entity == null) {
|
entities.add(entity);
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
if (entities.size() > limit) {
|
} else {
|
||||||
entities.pollLast();
|
// Scan through part of the table to find the entities belong to one app
|
||||||
|
// and one type
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
|
||||||
|
clusterId, userId, flowId, flowRunId, appId, entityType));
|
||||||
|
scan.setMaxVersions(Integer.MAX_VALUE);
|
||||||
|
ResultScanner scanner =
|
||||||
|
entityTable.getResultScanner(hbaseConf, conn, scan);
|
||||||
|
for (Result result : scanner) {
|
||||||
|
TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
|
||||||
|
true, createdTimeBegin, createdTimeEnd,
|
||||||
|
true, modifiedTimeBegin, modifiedTimeEnd,
|
||||||
|
isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters,
|
||||||
|
metricFilters, isApplication);
|
||||||
|
if (entity == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (entities.size() > limit) {
|
||||||
|
entities.pollLast();
|
||||||
|
}
|
||||||
|
entities.add(entity);
|
||||||
}
|
}
|
||||||
entities.add(entity);
|
|
||||||
}
|
}
|
||||||
return entities;
|
return entities;
|
||||||
}
|
}
|
||||||
|
@ -221,26 +260,37 @@ public class HBaseTimelineReaderImpl
|
||||||
boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd,
|
boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd,
|
||||||
Map<String, Set<String>> isRelatedTo, Map<String, Set<String>> relatesTo,
|
Map<String, Set<String>> isRelatedTo, Map<String, Set<String>> relatesTo,
|
||||||
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
Map<String, Object> infoFilters, Map<String, String> configFilters,
|
||||||
Set<String> eventFilters, Set<String> metricFilters)
|
Set<String> eventFilters, Set<String> metricFilters,
|
||||||
|
boolean isApplication)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (result == null || result.isEmpty()) {
|
if (result == null || result.isEmpty()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
TimelineEntity entity = new TimelineEntity();
|
TimelineEntity entity = new TimelineEntity();
|
||||||
entity.setType(EntityColumn.TYPE.readResult(result).toString());
|
String entityType = isApplication ?
|
||||||
entity.setId(EntityColumn.ID.readResult(result).toString());
|
TimelineEntityType.YARN_APPLICATION.toString() :
|
||||||
|
EntityColumn.TYPE.readResult(result).toString();
|
||||||
|
entity.setType(entityType);
|
||||||
|
String entityId = isApplication ?
|
||||||
|
ApplicationColumn.ID.readResult(result).toString() :
|
||||||
|
EntityColumn.ID.readResult(result).toString();
|
||||||
|
entity.setId(entityId);
|
||||||
|
|
||||||
// fetch created time
|
// fetch created time
|
||||||
entity.setCreatedTime(
|
Number createdTime = isApplication ?
|
||||||
((Number) EntityColumn.CREATED_TIME.readResult(result)).longValue());
|
(Number)ApplicationColumn.CREATED_TIME.readResult(result) :
|
||||||
|
(Number)EntityColumn.CREATED_TIME.readResult(result);
|
||||||
|
entity.setCreatedTime(createdTime.longValue());
|
||||||
if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin ||
|
if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin ||
|
||||||
entity.getCreatedTime() > createdTimeEnd)) {
|
entity.getCreatedTime() > createdTimeEnd)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetch modified time
|
// fetch modified time
|
||||||
entity.setCreatedTime(
|
Number modifiedTime = isApplication ?
|
||||||
((Number) EntityColumn.MODIFIED_TIME.readResult(result)).longValue());
|
(Number)ApplicationColumn.MODIFIED_TIME.readResult(result) :
|
||||||
|
(Number)EntityColumn.MODIFIED_TIME.readResult(result);
|
||||||
|
entity.setModifiedTime(modifiedTime.longValue());
|
||||||
if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin ||
|
if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin ||
|
||||||
entity.getModifiedTime() > modifiedTimeEnd)) {
|
entity.getModifiedTime() > modifiedTimeEnd)) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -250,7 +300,13 @@ public class HBaseTimelineReaderImpl
|
||||||
boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
|
boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
|
||||||
if (fieldsToRetrieve.contains(Field.ALL) ||
|
if (fieldsToRetrieve.contains(Field.ALL) ||
|
||||||
fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
|
fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
|
||||||
readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO);
|
if (isApplication) {
|
||||||
|
readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
|
||||||
|
true);
|
||||||
|
} else {
|
||||||
|
readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO,
|
||||||
|
true);
|
||||||
|
}
|
||||||
if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
|
if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
|
||||||
entity.getIsRelatedToEntities(), isRelatedTo)) {
|
entity.getIsRelatedToEntities(), isRelatedTo)) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -265,7 +321,12 @@ public class HBaseTimelineReaderImpl
|
||||||
boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
|
boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
|
||||||
if (fieldsToRetrieve.contains(Field.ALL) ||
|
if (fieldsToRetrieve.contains(Field.ALL) ||
|
||||||
fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
|
fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
|
||||||
readRelationship(entity, result, EntityColumnPrefix.RELATES_TO);
|
if (isApplication) {
|
||||||
|
readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
|
||||||
|
false);
|
||||||
|
} else {
|
||||||
|
readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
|
||||||
|
}
|
||||||
if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
|
if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
|
||||||
entity.getRelatesToEntities(), relatesTo)) {
|
entity.getRelatesToEntities(), relatesTo)) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -280,7 +341,11 @@ public class HBaseTimelineReaderImpl
|
||||||
boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
|
boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
|
||||||
if (fieldsToRetrieve.contains(Field.ALL) ||
|
if (fieldsToRetrieve.contains(Field.ALL) ||
|
||||||
fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
|
fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
|
||||||
readKeyValuePairs(entity, result, EntityColumnPrefix.INFO);
|
if (isApplication) {
|
||||||
|
readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
|
||||||
|
} else {
|
||||||
|
readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
|
||||||
|
}
|
||||||
if (checkInfo &&
|
if (checkInfo &&
|
||||||
!TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
|
!TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -295,7 +360,11 @@ public class HBaseTimelineReaderImpl
|
||||||
boolean checkConfigs = configFilters != null && configFilters.size() > 0;
|
boolean checkConfigs = configFilters != null && configFilters.size() > 0;
|
||||||
if (fieldsToRetrieve.contains(Field.ALL) ||
|
if (fieldsToRetrieve.contains(Field.ALL) ||
|
||||||
fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
|
fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
|
||||||
readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG);
|
if (isApplication) {
|
||||||
|
readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
|
||||||
|
} else {
|
||||||
|
readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
|
||||||
|
}
|
||||||
if (checkConfigs && !TimelineReaderUtils.matchFilters(
|
if (checkConfigs && !TimelineReaderUtils.matchFilters(
|
||||||
entity.getConfigs(), configFilters)) {
|
entity.getConfigs(), configFilters)) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -310,7 +379,7 @@ public class HBaseTimelineReaderImpl
|
||||||
boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
|
boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
|
||||||
if (fieldsToRetrieve.contains(Field.ALL) ||
|
if (fieldsToRetrieve.contains(Field.ALL) ||
|
||||||
fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
|
fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
|
||||||
readEvents(entity, result);
|
readEvents(entity, result, isApplication);
|
||||||
if (checkEvents && !TimelineReaderUtils.matchEventFilters(
|
if (checkEvents && !TimelineReaderUtils.matchEventFilters(
|
||||||
entity.getEvents(), eventFilters)) {
|
entity.getEvents(), eventFilters)) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -325,7 +394,7 @@ public class HBaseTimelineReaderImpl
|
||||||
boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
|
boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
|
||||||
if (fieldsToRetrieve.contains(Field.ALL) ||
|
if (fieldsToRetrieve.contains(Field.ALL) ||
|
||||||
fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
|
fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
|
||||||
readMetrics(entity, result);
|
readMetrics(entity, result, isApplication);
|
||||||
if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
|
if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
|
||||||
entity.getMetrics(), metricFilters)) {
|
entity.getMetrics(), metricFilters)) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -338,15 +407,15 @@ public class HBaseTimelineReaderImpl
|
||||||
return entity;
|
return entity;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void readRelationship(
|
private static <T> void readRelationship(
|
||||||
TimelineEntity entity, Result result, EntityColumnPrefix prefix)
|
TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
|
||||||
throws IOException {
|
boolean isRelatedTo) throws IOException {
|
||||||
// isRelatedTo and relatesTo are of type Map<String, Set<String>>
|
// isRelatedTo and relatesTo are of type Map<String, Set<String>>
|
||||||
Map<String, Object> columns = prefix.readResults(result);
|
Map<String, Object> columns = prefix.readResults(result);
|
||||||
for (Map.Entry<String, Object> column : columns.entrySet()) {
|
for (Map.Entry<String, Object> column : columns.entrySet()) {
|
||||||
for (String id : Separator.VALUES.splitEncoded(
|
for (String id : Separator.VALUES.splitEncoded(
|
||||||
column.getValue().toString())) {
|
column.getValue().toString())) {
|
||||||
if (prefix.equals(EntityColumnPrefix.IS_RELATED_TO)) {
|
if (isRelatedTo) {
|
||||||
entity.addIsRelatedToEntity(column.getKey(), id);
|
entity.addIsRelatedToEntity(column.getKey(), id);
|
||||||
} else {
|
} else {
|
||||||
entity.addRelatesToEntity(column.getKey(), id);
|
entity.addRelatesToEntity(column.getKey(), id);
|
||||||
|
@ -355,12 +424,12 @@ public class HBaseTimelineReaderImpl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void readKeyValuePairs(
|
private static <T> void readKeyValuePairs(
|
||||||
TimelineEntity entity, Result result, EntityColumnPrefix prefix)
|
TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
|
||||||
throws IOException {
|
boolean isConfig) throws IOException {
|
||||||
// info and configuration are of type Map<String, Object or String>
|
// info and configuration are of type Map<String, Object or String>
|
||||||
Map<String, Object> columns = prefix.readResults(result);
|
Map<String, Object> columns = prefix.readResults(result);
|
||||||
if (prefix.equals(EntityColumnPrefix.CONFIG)) {
|
if (isConfig) {
|
||||||
for (Map.Entry<String, Object> column : columns.entrySet()) {
|
for (Map.Entry<String, Object> column : columns.entrySet()) {
|
||||||
entity.addConfig(column.getKey(), column.getKey().toString());
|
entity.addConfig(column.getKey(), column.getKey().toString());
|
||||||
}
|
}
|
||||||
|
@ -369,10 +438,11 @@ public class HBaseTimelineReaderImpl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void readEvents(TimelineEntity entity, Result result)
|
private static void readEvents(TimelineEntity entity, Result result,
|
||||||
throws IOException {
|
boolean isApplication) throws IOException {
|
||||||
Map<String, TimelineEvent> eventsMap = new HashMap<>();
|
Map<String, TimelineEvent> eventsMap = new HashMap<>();
|
||||||
Map<String, Object> eventsResult =
|
Map<String, Object> eventsResult = isApplication ?
|
||||||
|
ApplicationColumnPrefix.EVENT.readResults(result) :
|
||||||
EntityColumnPrefix.EVENT.readResults(result);
|
EntityColumnPrefix.EVENT.readResults(result);
|
||||||
for (Map.Entry<String,Object> eventResult : eventsResult.entrySet()) {
|
for (Map.Entry<String,Object> eventResult : eventsResult.entrySet()) {
|
||||||
Collection<String> tokens =
|
Collection<String> tokens =
|
||||||
|
@ -405,10 +475,16 @@ public class HBaseTimelineReaderImpl
|
||||||
entity.addEvents(eventsSet);
|
entity.addEvents(eventsSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void readMetrics(TimelineEntity entity, Result result)
|
private static void readMetrics(TimelineEntity entity, Result result,
|
||||||
throws IOException {
|
boolean isApplication) throws IOException {
|
||||||
NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
|
NavigableMap<String, NavigableMap<Long, Number>> metricsResult;
|
||||||
EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
|
if (isApplication) {
|
||||||
|
metricsResult =
|
||||||
|
ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
|
||||||
|
} else {
|
||||||
|
metricsResult =
|
||||||
|
EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
|
||||||
|
}
|
||||||
for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
|
for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
|
||||||
metricsResult.entrySet()) {
|
metricsResult.entrySet()) {
|
||||||
TimelineMetric metric = new TimelineMetric();
|
TimelineMetric metric = new TimelineMetric();
|
||||||
|
|
|
@ -38,9 +38,14 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
|
||||||
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
|
||||||
|
@ -61,6 +66,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
private Connection conn;
|
private Connection conn;
|
||||||
private TypedBufferedMutator<EntityTable> entityTable;
|
private TypedBufferedMutator<EntityTable> entityTable;
|
||||||
private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
|
private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
|
||||||
|
private TypedBufferedMutator<ApplicationTable> applicationTable;
|
||||||
|
|
||||||
private static final Log LOG = LogFactory
|
private static final Log LOG = LogFactory
|
||||||
.getLog(HBaseTimelineWriterImpl.class);
|
.getLog(HBaseTimelineWriterImpl.class);
|
||||||
|
@ -84,6 +90,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
conn = ConnectionFactory.createConnection(hbaseConf);
|
conn = ConnectionFactory.createConnection(hbaseConf);
|
||||||
entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
|
entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
|
||||||
appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
|
appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
|
||||||
|
applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -102,18 +109,20 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] rowKey =
|
// if the entity is the application, the destination is the application
|
||||||
|
// table
|
||||||
|
boolean isApplication = isApplicationEntity(te);
|
||||||
|
byte[] rowKey = isApplication ?
|
||||||
|
ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
|
||||||
|
appId) :
|
||||||
EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
|
EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
|
||||||
te.getType(), te.getId());
|
te.getType(), te.getId());
|
||||||
|
|
||||||
storeInfo(rowKey, te, flowVersion);
|
storeInfo(rowKey, te, flowVersion, isApplication);
|
||||||
storeEvents(rowKey, te.getEvents());
|
storeEvents(rowKey, te.getEvents(), isApplication);
|
||||||
storeConfig(rowKey, te.getConfigs());
|
storeConfig(rowKey, te.getConfigs(), isApplication);
|
||||||
storeMetrics(rowKey, te.getMetrics());
|
storeMetrics(rowKey, te.getMetrics(), isApplication);
|
||||||
storeRelations(rowKey, te.getIsRelatedToEntities(),
|
storeRelations(rowKey, te, isApplication);
|
||||||
EntityColumnPrefix.IS_RELATED_TO);
|
|
||||||
storeRelations(rowKey, te.getRelatesToEntities(),
|
|
||||||
EntityColumnPrefix.RELATES_TO);
|
|
||||||
|
|
||||||
if (isApplicationCreated(te)) {
|
if (isApplicationCreated(te)) {
|
||||||
onApplicationCreated(
|
onApplicationCreated(
|
||||||
|
@ -123,9 +132,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
return putStatus;
|
return putStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean isApplicationEntity(TimelineEntity te) {
|
||||||
|
return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
|
||||||
|
}
|
||||||
|
|
||||||
private static boolean isApplicationCreated(TimelineEntity te) {
|
private static boolean isApplicationCreated(TimelineEntity te) {
|
||||||
if (te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString())) {
|
if (isApplicationEntity(te)) {
|
||||||
boolean isAppCreated = false;
|
|
||||||
for (TimelineEvent event : te.getEvents()) {
|
for (TimelineEvent event : te.getEvents()) {
|
||||||
if (event.getId().equals(
|
if (event.getId().equals(
|
||||||
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
|
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
|
||||||
|
@ -145,41 +157,74 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
rowKey, appToFlowTable, null, flowRunId);
|
rowKey, appToFlowTable, null, flowRunId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void storeRelations(byte[] rowKey, TimelineEntity te,
|
||||||
|
boolean isApplication) throws IOException {
|
||||||
|
if (isApplication) {
|
||||||
|
storeRelations(rowKey, te.getIsRelatedToEntities(),
|
||||||
|
ApplicationColumnPrefix.IS_RELATED_TO, applicationTable);
|
||||||
|
storeRelations(rowKey, te.getRelatesToEntities(),
|
||||||
|
ApplicationColumnPrefix.RELATES_TO, applicationTable);
|
||||||
|
} else {
|
||||||
|
storeRelations(rowKey, te.getIsRelatedToEntities(),
|
||||||
|
EntityColumnPrefix.IS_RELATED_TO, entityTable);
|
||||||
|
storeRelations(rowKey, te.getRelatesToEntities(),
|
||||||
|
EntityColumnPrefix.RELATES_TO, entityTable);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stores the Relations from the {@linkplain TimelineEntity} object
|
* Stores the Relations from the {@linkplain TimelineEntity} object
|
||||||
*/
|
*/
|
||||||
private void storeRelations(byte[] rowKey,
|
private <T> void storeRelations(byte[] rowKey,
|
||||||
Map<String, Set<String>> connectedEntities,
|
Map<String, Set<String>> connectedEntities,
|
||||||
EntityColumnPrefix entityColumnPrefix) throws IOException {
|
ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
|
||||||
|
throws IOException {
|
||||||
for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities
|
for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities
|
||||||
.entrySet()) {
|
.entrySet()) {
|
||||||
// id3?id4?id5
|
// id3?id4?id5
|
||||||
String compoundValue =
|
String compoundValue =
|
||||||
Separator.VALUES.joinEncoded(connectedEntity.getValue());
|
Separator.VALUES.joinEncoded(connectedEntity.getValue());
|
||||||
|
|
||||||
entityColumnPrefix.store(rowKey, entityTable, connectedEntity.getKey(),
|
columnPrefix.store(rowKey, table, connectedEntity.getKey(), null,
|
||||||
null, compoundValue);
|
compoundValue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stores information from the {@linkplain TimelineEntity} object
|
* Stores information from the {@linkplain TimelineEntity} object
|
||||||
*/
|
*/
|
||||||
private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion)
|
private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion,
|
||||||
throws IOException {
|
boolean isApplication) throws IOException {
|
||||||
|
|
||||||
EntityColumn.ID.store(rowKey, entityTable, null, te.getId());
|
if (isApplication) {
|
||||||
EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType());
|
ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId());
|
||||||
EntityColumn.CREATED_TIME.store(rowKey, entityTable, null,
|
ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null,
|
||||||
te.getCreatedTime());
|
te.getCreatedTime());
|
||||||
EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null,
|
ApplicationColumn.MODIFIED_TIME.store(rowKey, applicationTable, null,
|
||||||
te.getModifiedTime());
|
te.getModifiedTime());
|
||||||
EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
|
ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null,
|
||||||
Map<String, Object> info = te.getInfo();
|
flowVersion);
|
||||||
if (info != null) {
|
Map<String, Object> info = te.getInfo();
|
||||||
for (Map.Entry<String, Object> entry : info.entrySet()) {
|
if (info != null) {
|
||||||
EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(),
|
for (Map.Entry<String, Object> entry : info.entrySet()) {
|
||||||
null, entry.getValue());
|
ApplicationColumnPrefix.INFO.store(rowKey, applicationTable,
|
||||||
|
entry.getKey(), null, entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
EntityColumn.ID.store(rowKey, entityTable, null, te.getId());
|
||||||
|
EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType());
|
||||||
|
EntityColumn.CREATED_TIME.store(rowKey, entityTable, null,
|
||||||
|
te.getCreatedTime());
|
||||||
|
EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null,
|
||||||
|
te.getModifiedTime());
|
||||||
|
EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
|
||||||
|
Map<String, Object> info = te.getInfo();
|
||||||
|
if (info != null) {
|
||||||
|
for (Map.Entry<String, Object> entry : info.entrySet()) {
|
||||||
|
EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(),
|
||||||
|
null, entry.getValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -187,14 +232,19 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
/**
|
/**
|
||||||
* stores the config information from {@linkplain TimelineEntity}
|
* stores the config information from {@linkplain TimelineEntity}
|
||||||
*/
|
*/
|
||||||
private void storeConfig(byte[] rowKey, Map<String, String> config)
|
private void storeConfig(byte[] rowKey, Map<String, String> config,
|
||||||
throws IOException {
|
boolean isApplication) throws IOException {
|
||||||
if (config == null) {
|
if (config == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for (Map.Entry<String, String> entry : config.entrySet()) {
|
for (Map.Entry<String, String> entry : config.entrySet()) {
|
||||||
EntityColumnPrefix.CONFIG.store(rowKey, entityTable, entry.getKey(),
|
if (isApplication) {
|
||||||
|
ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable,
|
||||||
|
entry.getKey(), null, entry.getValue());
|
||||||
|
} else {
|
||||||
|
EntityColumnPrefix.CONFIG.store(rowKey, entityTable, entry.getKey(),
|
||||||
null, entry.getValue());
|
null, entry.getValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -202,16 +252,21 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
* stores the {@linkplain TimelineMetric} information from the
|
* stores the {@linkplain TimelineMetric} information from the
|
||||||
* {@linkplain TimelineEvent} object
|
* {@linkplain TimelineEvent} object
|
||||||
*/
|
*/
|
||||||
private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics)
|
private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
|
||||||
throws IOException {
|
boolean isApplication) throws IOException {
|
||||||
if (metrics != null) {
|
if (metrics != null) {
|
||||||
for (TimelineMetric metric : metrics) {
|
for (TimelineMetric metric : metrics) {
|
||||||
String metricColumnQualifier = metric.getId();
|
String metricColumnQualifier = metric.getId();
|
||||||
Map<Long, Number> timeseries = metric.getValues();
|
Map<Long, Number> timeseries = metric.getValues();
|
||||||
for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
|
for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
|
||||||
Long timestamp = timeseriesEntry.getKey();
|
Long timestamp = timeseriesEntry.getKey();
|
||||||
EntityColumnPrefix.METRIC.store(rowKey, entityTable,
|
if (isApplication) {
|
||||||
|
ApplicationColumnPrefix.METRIC.store(rowKey, applicationTable,
|
||||||
metricColumnQualifier, timestamp, timeseriesEntry.getValue());
|
metricColumnQualifier, timestamp, timeseriesEntry.getValue());
|
||||||
|
} else {
|
||||||
|
EntityColumnPrefix.METRIC.store(rowKey, entityTable,
|
||||||
|
metricColumnQualifier, timestamp, timeseriesEntry.getValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -220,8 +275,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
/**
|
/**
|
||||||
* Stores the events from the {@linkplain TimelineEvent} object
|
* Stores the events from the {@linkplain TimelineEvent} object
|
||||||
*/
|
*/
|
||||||
private void storeEvents(byte[] rowKey, Set<TimelineEvent> events)
|
private void storeEvents(byte[] rowKey, Set<TimelineEvent> events,
|
||||||
throws IOException {
|
boolean isApplication) throws IOException {
|
||||||
if (events != null) {
|
if (events != null) {
|
||||||
for (TimelineEvent event : events) {
|
for (TimelineEvent event : events) {
|
||||||
if (event != null) {
|
if (event != null) {
|
||||||
|
@ -258,8 +313,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
// convert back to string to avoid additional API on store.
|
// convert back to string to avoid additional API on store.
|
||||||
String compoundColumnQualifier =
|
String compoundColumnQualifier =
|
||||||
Bytes.toString(compoundColumnQualifierBytes);
|
Bytes.toString(compoundColumnQualifierBytes);
|
||||||
EntityColumnPrefix.EVENT.store(rowKey, entityTable,
|
if (isApplication) {
|
||||||
|
ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
|
||||||
compoundColumnQualifier, null, info.getValue());
|
compoundColumnQualifier, null, info.getValue());
|
||||||
|
} else {
|
||||||
|
EntityColumnPrefix.EVENT.store(rowKey, entityTable,
|
||||||
|
compoundColumnQualifier, null, info.getValue());
|
||||||
|
}
|
||||||
} // for info: eventInfo
|
} // for info: eventInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -279,6 +339,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
// flush all buffered mutators
|
// flush all buffered mutators
|
||||||
entityTable.flush();
|
entityTable.flush();
|
||||||
appToFlowTable.flush();
|
appToFlowTable.flush();
|
||||||
|
applicationTable.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -288,15 +349,19 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStop() throws Exception {
|
protected void serviceStop() throws Exception {
|
||||||
if (entityTable != null) {
|
if (entityTable != null) {
|
||||||
LOG.info("closing entity table");
|
LOG.info("closing the entity table");
|
||||||
// The close API performs flushing and releases any resources held
|
// The close API performs flushing and releases any resources held
|
||||||
entityTable.close();
|
entityTable.close();
|
||||||
}
|
}
|
||||||
if (appToFlowTable != null) {
|
if (appToFlowTable != null) {
|
||||||
LOG.info("closing app_flow table");
|
LOG.info("closing the app_flow table");
|
||||||
// The close API performs flushing and releases any resources held
|
// The close API performs flushing and releases any resources held
|
||||||
appToFlowTable.close();
|
appToFlowTable.close();
|
||||||
}
|
}
|
||||||
|
if (applicationTable != null) {
|
||||||
|
LOG.info("closing the application table");
|
||||||
|
applicationTable.close();
|
||||||
|
}
|
||||||
if (conn != null) {
|
if (conn != null) {
|
||||||
LOG.info("closing the hbase Connection");
|
LOG.info("closing the hbase Connection");
|
||||||
conn.close();
|
conn.close();
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
import org.apache.hadoop.util.GenericOptionsParser;
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
||||||
|
|
||||||
|
@ -76,6 +77,12 @@ public class TimelineSchemaCreator {
|
||||||
if (StringUtils.isNotBlank(appToflowTableName)) {
|
if (StringUtils.isNotBlank(appToflowTableName)) {
|
||||||
hbaseConf.set(AppToFlowTable.TABLE_NAME_CONF_NAME, appToflowTableName);
|
hbaseConf.set(AppToFlowTable.TABLE_NAME_CONF_NAME, appToflowTableName);
|
||||||
}
|
}
|
||||||
|
// Grab the applicationTableName argument
|
||||||
|
String applicationTableName = commandLine.getOptionValue("a");
|
||||||
|
if (StringUtils.isNotBlank(applicationTableName)) {
|
||||||
|
hbaseConf.set(ApplicationTable.TABLE_NAME_CONF_NAME,
|
||||||
|
applicationTableName);
|
||||||
|
}
|
||||||
createAllTables(hbaseConf);
|
createAllTables(hbaseConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,6 +110,8 @@ public class TimelineSchemaCreator {
|
||||||
|
|
||||||
o = new Option("a2f", "appToflowTableName", true, "app to flow table name");
|
o = new Option("a2f", "appToflowTableName", true, "app to flow table name");
|
||||||
o.setArgName("appToflowTableName");
|
o.setArgName("appToflowTableName");
|
||||||
|
o = new Option("a", "applicationTableName", true, "application table name");
|
||||||
|
o.setArgName("applicationTableName");
|
||||||
o.setRequired(false);
|
o.setRequired(false);
|
||||||
options.addOption(o);
|
options.addOption(o);
|
||||||
|
|
||||||
|
@ -132,6 +141,7 @@ public class TimelineSchemaCreator {
|
||||||
}
|
}
|
||||||
new EntityTable().createTable(admin, hbaseConf);
|
new EntityTable().createTable(admin, hbaseConf);
|
||||||
new AppToFlowTable().createTable(admin, hbaseConf);
|
new AppToFlowTable().createTable(admin, hbaseConf);
|
||||||
|
new ApplicationTable().createTable(admin, hbaseConf);
|
||||||
} finally {
|
} finally {
|
||||||
if (conn != null) {
|
if (conn != null) {
|
||||||
conn.close();
|
conn.close();
|
||||||
|
|
|
@ -0,0 +1,136 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage.application;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Identifies fully qualified columns for the {@link ApplicationTable}.
|
||||||
|
*/
|
||||||
|
public enum ApplicationColumn implements Column<ApplicationTable> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* App id
|
||||||
|
*/
|
||||||
|
ID(ApplicationColumnFamily.INFO, "id"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When the application was created.
|
||||||
|
*/
|
||||||
|
CREATED_TIME(ApplicationColumnFamily.INFO, "created_time"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When it was modified.
|
||||||
|
*/
|
||||||
|
MODIFIED_TIME(ApplicationColumnFamily.INFO, "modified_time"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The version of the flow that this app belongs to.
|
||||||
|
*/
|
||||||
|
FLOW_VERSION(ApplicationColumnFamily.INFO, "flow_version");
|
||||||
|
|
||||||
|
private final ColumnHelper<ApplicationTable> column;
|
||||||
|
private final ColumnFamily<ApplicationTable> columnFamily;
|
||||||
|
private final String columnQualifier;
|
||||||
|
private final byte[] columnQualifierBytes;
|
||||||
|
|
||||||
|
private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily,
|
||||||
|
String columnQualifier) {
|
||||||
|
this.columnFamily = columnFamily;
|
||||||
|
this.columnQualifier = columnQualifier;
|
||||||
|
// Future-proof by ensuring the right column prefix hygiene.
|
||||||
|
this.columnQualifierBytes =
|
||||||
|
Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
|
||||||
|
this.column = new ColumnHelper<ApplicationTable>(columnFamily);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the column name value
|
||||||
|
*/
|
||||||
|
private String getColumnQualifier() {
|
||||||
|
return columnQualifier;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void store(byte[] rowKey,
|
||||||
|
TypedBufferedMutator<ApplicationTable> tableMutator, Long timestamp,
|
||||||
|
Object inputValue) throws IOException {
|
||||||
|
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
|
||||||
|
inputValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Object readResult(Result result) throws IOException {
|
||||||
|
return column.readResult(result, columnQualifierBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve an {@link ApplicationColumn} given a name, or null if there is no
|
||||||
|
* match. The following holds true: {@code columnFor(x) == columnFor(y)} if
|
||||||
|
* and only if {@code x.equals(y)} or {@code (x == y == null)}
|
||||||
|
*
|
||||||
|
* @param columnQualifier Name of the column to retrieve
|
||||||
|
* @return the corresponding {@link ApplicationColumn} or null
|
||||||
|
*/
|
||||||
|
public static final ApplicationColumn columnFor(String columnQualifier) {
|
||||||
|
|
||||||
|
// Match column based on value, assume column family matches.
|
||||||
|
for (ApplicationColumn ac : ApplicationColumn.values()) {
|
||||||
|
// Find a match based only on name.
|
||||||
|
if (ac.getColumnQualifier().equals(columnQualifier)) {
|
||||||
|
return ac;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default to null
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve an {@link ApplicationColumn} given a name, or null if there is no
|
||||||
|
* match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
|
||||||
|
* if and only if {@code a.equals(b) & x.equals(y)} or
|
||||||
|
* {@code (x == y == null)}
|
||||||
|
*
|
||||||
|
* @param columnFamily The columnFamily for which to retrieve the column.
|
||||||
|
* @param name Name of the column to retrieve
|
||||||
|
* @return the corresponding {@link ApplicationColumn} or null if both
|
||||||
|
* arguments don't match.
|
||||||
|
*/
|
||||||
|
public static final ApplicationColumn columnFor(
|
||||||
|
ApplicationColumnFamily columnFamily, String name) {
|
||||||
|
|
||||||
|
for (ApplicationColumn ac : ApplicationColumn.values()) {
|
||||||
|
// Find a match based column family and on name.
|
||||||
|
if (ac.columnFamily.equals(columnFamily)
|
||||||
|
&& ac.getColumnQualifier().equals(name)) {
|
||||||
|
return ac;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default to null
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,65 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage.application;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents the application table column families.
|
||||||
|
*/
|
||||||
|
public enum ApplicationColumnFamily implements ColumnFamily<ApplicationTable> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Info column family houses known columns, specifically ones included in
|
||||||
|
* columnfamily filters.
|
||||||
|
*/
|
||||||
|
INFO("i"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configurations are in a separate column family for two reasons: a) the size
|
||||||
|
* of the config values can be very large and b) we expect that config values
|
||||||
|
* are often separately accessed from other metrics and info columns.
|
||||||
|
*/
|
||||||
|
CONFIGS("c"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Metrics have a separate column family, because they have a separate TTL.
|
||||||
|
*/
|
||||||
|
METRICS("m");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Byte representation of this column family.
|
||||||
|
*/
|
||||||
|
private final byte[] bytes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param value create a column family with this name. Must be lower case and
|
||||||
|
* without spaces.
|
||||||
|
*/
|
||||||
|
private ApplicationColumnFamily(String value) {
|
||||||
|
// column families should be lower case and not contain any spaces.
|
||||||
|
this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getBytes() {
|
||||||
|
return Bytes.copy(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,217 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage.application;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.NavigableMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Identifies partially qualified columns for the application table.
|
||||||
|
*/
|
||||||
|
public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* To store TimelineEntity getIsRelatedToEntities values.
|
||||||
|
*/
|
||||||
|
IS_RELATED_TO(ApplicationColumnFamily.INFO, "s"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* To store TimelineEntity getRelatesToEntities values.
|
||||||
|
*/
|
||||||
|
RELATES_TO(ApplicationColumnFamily.INFO, "r"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* To store TimelineEntity info values.
|
||||||
|
*/
|
||||||
|
INFO(ApplicationColumnFamily.INFO, "i"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lifecycle events for an application
|
||||||
|
*/
|
||||||
|
EVENT(ApplicationColumnFamily.INFO, "e"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Config column stores configuration with config key as the column name.
|
||||||
|
*/
|
||||||
|
CONFIG(ApplicationColumnFamily.CONFIGS, null),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Metrics are stored with the metric name as the column name.
|
||||||
|
*/
|
||||||
|
METRIC(ApplicationColumnFamily.METRICS, null);
|
||||||
|
|
||||||
|
private final ColumnHelper<ApplicationTable> column;
|
||||||
|
private final ColumnFamily<ApplicationTable> columnFamily;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Can be null for those cases where the provided column qualifier is the
|
||||||
|
* entire column name.
|
||||||
|
*/
|
||||||
|
private final String columnPrefix;
|
||||||
|
private final byte[] columnPrefixBytes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Private constructor, meant to be used by the enum definition.
|
||||||
|
*
|
||||||
|
* @param columnFamily that this column is stored in.
|
||||||
|
* @param columnPrefix for this column.
|
||||||
|
*/
|
||||||
|
private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
|
||||||
|
String columnPrefix) {
|
||||||
|
column = new ColumnHelper<ApplicationTable>(columnFamily);
|
||||||
|
this.columnFamily = columnFamily;
|
||||||
|
this.columnPrefix = columnPrefix;
|
||||||
|
if (columnPrefix == null) {
|
||||||
|
this.columnPrefixBytes = null;
|
||||||
|
} else {
|
||||||
|
// Future-proof by ensuring the right column prefix hygiene.
|
||||||
|
this.columnPrefixBytes =
|
||||||
|
Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the column name value
|
||||||
|
*/
|
||||||
|
private String getColumnPrefix() {
|
||||||
|
return columnPrefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see
|
||||||
|
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
|
||||||
|
* #store(byte[],
|
||||||
|
* org.apache.hadoop.yarn.server.timelineservice.storage.common.
|
||||||
|
* TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
|
||||||
|
*/
|
||||||
|
public void store(byte[] rowKey,
|
||||||
|
TypedBufferedMutator<ApplicationTable> tableMutator, String qualifier,
|
||||||
|
Long timestamp, Object inputValue) throws IOException {
|
||||||
|
|
||||||
|
// Null check
|
||||||
|
if (qualifier == null) {
|
||||||
|
throw new IOException("Cannot store column with null qualifier in "
|
||||||
|
+ tableMutator.getName().getNameAsString());
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] columnQualifier =
|
||||||
|
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
|
||||||
|
|
||||||
|
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see
|
||||||
|
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
|
||||||
|
* #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
|
||||||
|
*/
|
||||||
|
public Object readResult(Result result, String qualifier) throws IOException {
|
||||||
|
byte[] columnQualifier =
|
||||||
|
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
|
||||||
|
return column.readResult(result, columnQualifier);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see
|
||||||
|
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
|
||||||
|
* #readResults(org.apache.hadoop.hbase.client.Result)
|
||||||
|
*/
|
||||||
|
public Map<String, Object> readResults(Result result) throws IOException {
|
||||||
|
return column.readResults(result, columnPrefixBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see
|
||||||
|
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
|
||||||
|
* #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
|
||||||
|
*/
|
||||||
|
public <V> NavigableMap<String, NavigableMap<Long, V>>
|
||||||
|
readResultsWithTimestamps(Result result) throws IOException {
|
||||||
|
return column.readResultsWithTimestamps(result, columnPrefixBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve an {@link ApplicationColumnPrefix} given a name, or null if there
|
||||||
|
* is no match. The following holds true: {@code columnFor(x) == columnFor(y)}
|
||||||
|
* if and only if {@code x.equals(y)} or {@code (x == y == null)}
|
||||||
|
*
|
||||||
|
* @param columnPrefix Name of the column to retrieve
|
||||||
|
* @return the corresponding {@link ApplicationColumnPrefix} or null
|
||||||
|
*/
|
||||||
|
public static final ApplicationColumnPrefix columnFor(String columnPrefix) {
|
||||||
|
|
||||||
|
// Match column based on value, assume column family matches.
|
||||||
|
for (ApplicationColumnPrefix acp : ApplicationColumnPrefix.values()) {
|
||||||
|
// Find a match based only on name.
|
||||||
|
if (acp.getColumnPrefix().equals(columnPrefix)) {
|
||||||
|
return acp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default to null
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve an {@link ApplicationColumnPrefix} given a name, or null if there
|
||||||
|
* is no match. The following holds true:
|
||||||
|
* {@code columnFor(a,x) == columnFor(b,y)} if and only if
|
||||||
|
* {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
|
||||||
|
*
|
||||||
|
* @param columnFamily The columnFamily for which to retrieve the column.
|
||||||
|
* @param columnPrefix Name of the column to retrieve
|
||||||
|
* @return the corresponding {@link ApplicationColumnPrefix} or null if both
|
||||||
|
* arguments don't match.
|
||||||
|
*/
|
||||||
|
public static final ApplicationColumnPrefix columnFor(
|
||||||
|
ApplicationColumnFamily columnFamily, String columnPrefix) {
|
||||||
|
|
||||||
|
// TODO: needs unit test to confirm and need to update javadoc to explain
|
||||||
|
// null prefix case.
|
||||||
|
|
||||||
|
for (ApplicationColumnPrefix acp : ApplicationColumnPrefix.values()) {
|
||||||
|
// Find a match based column family and on name.
|
||||||
|
if (acp.columnFamily.equals(columnFamily)
|
||||||
|
&& (((columnPrefix == null) && (acp.getColumnPrefix() == null)) || (acp
|
||||||
|
.getColumnPrefix().equals(columnPrefix)))) {
|
||||||
|
return acp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default to null
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,67 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage.application;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a rowkey for the application table.
|
||||||
|
*/
|
||||||
|
public class ApplicationRowKey {
|
||||||
|
// TODO: more methods are needed for this class.
|
||||||
|
|
||||||
|
// TODO: API needs to be cleaned up.
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a row key for the application table as follows:
|
||||||
|
* {@code clusterId!userName!flowId!flowRunId!AppId}
|
||||||
|
*
|
||||||
|
* @param clusterId
|
||||||
|
* @param userId
|
||||||
|
* @param flowId
|
||||||
|
* @param flowRunId
|
||||||
|
* @param appId
|
||||||
|
* @return byte array with the row key
|
||||||
|
*/
|
||||||
|
public static byte[] getRowKey(String clusterId, String userId,
|
||||||
|
String flowId, Long flowRunId, String appId) {
|
||||||
|
byte[] first =
|
||||||
|
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId,
|
||||||
|
flowId));
|
||||||
|
// Note that flowRunId is a long, so we can't encode them all at the same
|
||||||
|
// time.
|
||||||
|
byte[] second = Bytes.toBytes(ApplicationRowKey.invert(flowRunId));
|
||||||
|
byte[] third = Bytes.toBytes(appId);
|
||||||
|
return Separator.QUALIFIERS.join(first, second, third);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a timestamp into its inverse timestamp to be used in (row) keys
|
||||||
|
* where we want to have the most recent timestamp in the top of the table
|
||||||
|
* (scans start at the most recent timestamp first).
|
||||||
|
*
|
||||||
|
* @param key value to be inverted so that the latest version will be first in
|
||||||
|
* a scan.
|
||||||
|
* @return inverted long
|
||||||
|
*/
|
||||||
|
public static long invert(Long key) {
|
||||||
|
return Long.MAX_VALUE - key;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,164 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage.application;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The application table as column families info, config and metrics. Info
|
||||||
|
* stores information about a YARN application entity, config stores
|
||||||
|
* configuration data of a YARN application, metrics stores the metrics of a
|
||||||
|
* YARN application. This table is entirely analogous to the entity table but
|
||||||
|
* created for better performance.
|
||||||
|
*
|
||||||
|
* Example application table record:
|
||||||
|
*
|
||||||
|
* <pre>
|
||||||
|
* |-------------------------------------------------------------------------|
|
||||||
|
* | Row | Column Family | Column Family| Column Family|
|
||||||
|
* | key | info | metrics | config |
|
||||||
|
* |-------------------------------------------------------------------------|
|
||||||
|
* | clusterId! | id:appId | metricId1: | configKey1: |
|
||||||
|
* | userName! | | metricValue1 | configValue1 |
|
||||||
|
* | flowId! | created_time: | @timestamp1 | |
|
||||||
|
* | flowRunId! | 1392993084018 | | configKey2: |
|
||||||
|
* | AppId | | metriciD1: | configValue2 |
|
||||||
|
* | | modified_time: | metricValue2 | |
|
||||||
|
* | | 1392995081012 | @timestamp2 | |
|
||||||
|
* | | | | |
|
||||||
|
* | | i!infoKey: | metricId2: | |
|
||||||
|
* | | infoValue | metricValue1 | |
|
||||||
|
* | | | @timestamp2 | |
|
||||||
|
* | | r!relatesToKey: | | |
|
||||||
|
* | | id3?id4?id5 | | |
|
||||||
|
* | | | | |
|
||||||
|
* | | s!isRelatedToKey: | | |
|
||||||
|
* | | id7?id9?id6 | | |
|
||||||
|
* | | | | |
|
||||||
|
* | | e!eventId?timestamp?infoKey: | | |
|
||||||
|
* | | eventInfoValue | | |
|
||||||
|
* | | | | |
|
||||||
|
* | | flowVersion: | | |
|
||||||
|
* | | versionValue | | |
|
||||||
|
* |-------------------------------------------------------------------------|
|
||||||
|
* </pre>
|
||||||
|
*/
|
||||||
|
public class ApplicationTable extends BaseTable<ApplicationTable> {
|
||||||
|
/** application prefix */
|
||||||
|
private static final String PREFIX =
|
||||||
|
YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".application";
|
||||||
|
|
||||||
|
/** config param name that specifies the application table name */
|
||||||
|
public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* config param name that specifies the TTL for metrics column family in
|
||||||
|
* application table
|
||||||
|
*/
|
||||||
|
private static final String METRICS_TTL_CONF_NAME = PREFIX
|
||||||
|
+ ".table.metrics.ttl";
|
||||||
|
|
||||||
|
/** default value for application table name */
|
||||||
|
private static final String DEFAULT_TABLE_NAME =
|
||||||
|
"timelineservice.application";
|
||||||
|
|
||||||
|
/** default TTL is 30 days for metrics timeseries */
|
||||||
|
private static final int DEFAULT_METRICS_TTL = 2592000;
|
||||||
|
|
||||||
|
/** default max number of versions */
|
||||||
|
private static final int DEFAULT_METRICS_MAX_VERSIONS = 1000;
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(ApplicationTable.class);
|
||||||
|
|
||||||
|
public ApplicationTable() {
|
||||||
|
super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* (non-Javadoc)
|
||||||
|
*
|
||||||
|
* @see
|
||||||
|
* org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
|
||||||
|
* (org.apache.hadoop.hbase.client.Admin,
|
||||||
|
* org.apache.hadoop.conf.Configuration)
|
||||||
|
*/
|
||||||
|
public void createTable(Admin admin, Configuration hbaseConf)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
TableName table = getTableName(hbaseConf);
|
||||||
|
if (admin.tableExists(table)) {
|
||||||
|
// do not disable / delete existing table
|
||||||
|
// similar to the approach taken by map-reduce jobs when
|
||||||
|
// output directory exists
|
||||||
|
throw new IOException("Table " + table.getNameAsString()
|
||||||
|
+ " already exists.");
|
||||||
|
}
|
||||||
|
|
||||||
|
HTableDescriptor applicationTableDescp = new HTableDescriptor(table);
|
||||||
|
HColumnDescriptor infoCF =
|
||||||
|
new HColumnDescriptor(ApplicationColumnFamily.INFO.getBytes());
|
||||||
|
infoCF.setBloomFilterType(BloomType.ROWCOL);
|
||||||
|
applicationTableDescp.addFamily(infoCF);
|
||||||
|
|
||||||
|
HColumnDescriptor configCF =
|
||||||
|
new HColumnDescriptor(ApplicationColumnFamily.CONFIGS.getBytes());
|
||||||
|
configCF.setBloomFilterType(BloomType.ROWCOL);
|
||||||
|
configCF.setBlockCacheEnabled(true);
|
||||||
|
applicationTableDescp.addFamily(configCF);
|
||||||
|
|
||||||
|
HColumnDescriptor metricsCF =
|
||||||
|
new HColumnDescriptor(ApplicationColumnFamily.METRICS.getBytes());
|
||||||
|
applicationTableDescp.addFamily(metricsCF);
|
||||||
|
metricsCF.setBlockCacheEnabled(true);
|
||||||
|
// always keep 1 version (the latest)
|
||||||
|
metricsCF.setMinVersions(1);
|
||||||
|
metricsCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
|
||||||
|
metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME,
|
||||||
|
DEFAULT_METRICS_TTL));
|
||||||
|
applicationTableDescp
|
||||||
|
.setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
|
||||||
|
applicationTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
|
||||||
|
TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
|
||||||
|
admin.createTable(applicationTableDescp,
|
||||||
|
TimelineHBaseSchemaConstants.getUsernameSplits());
|
||||||
|
LOG.info("Status of table creation for " + table.getNameAsString() + "="
|
||||||
|
+ admin.tableExists(table));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param metricsTTL time to live parameter for the metrics in this table.
|
||||||
|
* @param hbaseConf configuration in which to set the metrics TTL config
|
||||||
|
* variable.
|
||||||
|
*/
|
||||||
|
public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) {
|
||||||
|
hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage.application;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
|
@ -157,7 +157,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
|
||||||
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
|
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
|
||||||
* #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
|
* #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
|
||||||
*/
|
*/
|
||||||
public <T> NavigableMap<String, NavigableMap<Long, T>>
|
public <V> NavigableMap<String, NavigableMap<Long, V>>
|
||||||
readResultsWithTimestamps(Result result) throws IOException {
|
readResultsWithTimestamps(Result result) throws IOException {
|
||||||
return column.readResultsWithTimestamps(result, columnPrefixBytes);
|
return column.readResultsWithTimestamps(result, columnPrefixBytes);
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,36 +40,35 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBas
|
||||||
* Example entity table record:
|
* Example entity table record:
|
||||||
*
|
*
|
||||||
* <pre>
|
* <pre>
|
||||||
* |--------------------------------------------------------------------|
|
* |-------------------------------------------------------------------------|
|
||||||
* | Row | Column Family | Column Family| Column Family|
|
* | Row | Column Family | Column Family| Column Family|
|
||||||
* | key | info | metrics | config |
|
* | key | info | metrics | config |
|
||||||
* |--------------------------------------------------------------------|
|
* |-------------------------------------------------------------------------|
|
||||||
* | userName! | id:entityId | metricId1: | configKey1: |
|
* | userName! | id:entityId | metricId1: | configKey1: |
|
||||||
* | clusterId! | | metricValue1 | configValue1 |
|
* | clusterId! | | metricValue1 | configValue1 |
|
||||||
* | flowId! | type:entityType | @timestamp1 | |
|
* | flowId! | type:entityType | @timestamp1 | |
|
||||||
* | flowRunId! | | | configKey2: |
|
* | flowRunId! | | | configKey2: |
|
||||||
* | AppId! | created_time: | metriciD1: | configValue2 |
|
* | AppId! | created_time: | metriciD1: | configValue2 |
|
||||||
* | entityType!| 1392993084018 | metricValue2 | |
|
* | entityType!| 1392993084018 | metricValue2 | |
|
||||||
* | entityId | | @timestamp2 | |
|
* | entityId | | @timestamp2 | |
|
||||||
* | | modified_time: | | |
|
* | | modified_time: | | |
|
||||||
* | | 1392995081012 | metricId2: | |
|
* | | 1392995081012 | metricId2: | |
|
||||||
* | | | metricValue1 | |
|
* | | | metricValue1 | |
|
||||||
* | | i!infoKey: | @timestamp2 | |
|
* | | i!infoKey: | @timestamp2 | |
|
||||||
* | | infoValue | | |
|
* | | infoValue | | |
|
||||||
* | | | | |
|
* | | | | |
|
||||||
* | | r!relatesToKey: | | |
|
* | | r!relatesToKey: | | |
|
||||||
* | | id3?id4?id5 | | |
|
* | | id3?id4?id5 | | |
|
||||||
* | | | | |
|
* | | | | |
|
||||||
* | | s!isRelatedToKey | | |
|
* | | s!isRelatedToKey | | |
|
||||||
* | | id7?id9?id6 | | |
|
* | | id7?id9?id6 | | |
|
||||||
* | | | | |
|
* | | | | |
|
||||||
* | | e!eventId?eventInfoKey: | | |
|
* | | e!eventId?timestamp?infoKey: | | |
|
||||||
* | | eventInfoValue | | |
|
* | | eventInfoValue | | |
|
||||||
* | | @timestamp | | |
|
* | | | | |
|
||||||
* | | | | |
|
* | | flowVersion: | | |
|
||||||
* | | flowVersion: | | |
|
* | | versionValue | | |
|
||||||
* | | versionValue | | |
|
* |-------------------------------------------------------------------------|
|
||||||
* |--------------------------------------------------------------------|
|
|
||||||
* </pre>
|
* </pre>
|
||||||
*/
|
*/
|
||||||
public class EntityTable extends BaseTable<EntityTable> {
|
public class EntityTable extends BaseTable<EntityTable> {
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
@ -47,6 +48,10 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
|
||||||
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
|
||||||
|
@ -60,7 +65,15 @@ import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws Exception
|
* Various tests to test writing entities to HBase and reading them back from
|
||||||
|
* it.
|
||||||
|
*
|
||||||
|
* It uses a single HBase mini-cluster for all tests which is a little more
|
||||||
|
* realistic, and helps test correctness in the presence of other data.
|
||||||
|
*
|
||||||
|
* Each test uses a different cluster name to be able to handle its own data
|
||||||
|
* even if other records exist in the table. Use a different cluster name if
|
||||||
|
* you add a new test.
|
||||||
*/
|
*/
|
||||||
public class TestHBaseTimelineWriterImpl {
|
public class TestHBaseTimelineWriterImpl {
|
||||||
|
|
||||||
|
@ -78,6 +91,199 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
.createTable(util.getHBaseAdmin(), util.getConfiguration());
|
.createTable(util.getHBaseAdmin(), util.getConfiguration());
|
||||||
new AppToFlowTable()
|
new AppToFlowTable()
|
||||||
.createTable(util.getHBaseAdmin(), util.getConfiguration());
|
.createTable(util.getHBaseAdmin(), util.getConfiguration());
|
||||||
|
new ApplicationTable()
|
||||||
|
.createTable(util.getHBaseAdmin(), util.getConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWriteApplicationToHBase() throws Exception {
|
||||||
|
TimelineEntities te = new TimelineEntities();
|
||||||
|
ApplicationEntity entity = new ApplicationEntity();
|
||||||
|
String id = "hello";
|
||||||
|
entity.setId(id);
|
||||||
|
Long cTime = 1425016501000L;
|
||||||
|
Long mTime = 1425026901000L;
|
||||||
|
entity.setCreatedTime(cTime);
|
||||||
|
entity.setModifiedTime(mTime);
|
||||||
|
|
||||||
|
// add the info map in Timeline Entity
|
||||||
|
Map<String, Object> infoMap = new HashMap<String, Object>();
|
||||||
|
infoMap.put("infoMapKey1", "infoMapValue1");
|
||||||
|
infoMap.put("infoMapKey2", 10);
|
||||||
|
entity.addInfo(infoMap);
|
||||||
|
|
||||||
|
// add the isRelatedToEntity info
|
||||||
|
String key = "task";
|
||||||
|
String value = "is_related_to_entity_id_here";
|
||||||
|
Set<String> isRelatedToSet = new HashSet<String>();
|
||||||
|
isRelatedToSet.add(value);
|
||||||
|
Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
|
||||||
|
isRelatedTo.put(key, isRelatedToSet);
|
||||||
|
entity.setIsRelatedToEntities(isRelatedTo);
|
||||||
|
|
||||||
|
// add the relatesTo info
|
||||||
|
key = "container";
|
||||||
|
value = "relates_to_entity_id_here";
|
||||||
|
Set<String> relatesToSet = new HashSet<String>();
|
||||||
|
relatesToSet.add(value);
|
||||||
|
value = "relates_to_entity_id_here_Second";
|
||||||
|
relatesToSet.add(value);
|
||||||
|
Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
|
||||||
|
relatesTo.put(key, relatesToSet);
|
||||||
|
entity.setRelatesToEntities(relatesTo);
|
||||||
|
|
||||||
|
// add some config entries
|
||||||
|
Map<String, String> conf = new HashMap<String, String>();
|
||||||
|
conf.put("config_param1", "value1");
|
||||||
|
conf.put("config_param2", "value2");
|
||||||
|
entity.addConfigs(conf);
|
||||||
|
|
||||||
|
// add metrics
|
||||||
|
Set<TimelineMetric> metrics = new HashSet<>();
|
||||||
|
TimelineMetric m1 = new TimelineMetric();
|
||||||
|
m1.setId("MAP_SLOT_MILLIS");
|
||||||
|
Map<Long, Number> metricValues = new HashMap<Long, Number>();
|
||||||
|
long ts = System.currentTimeMillis();
|
||||||
|
metricValues.put(ts - 120000, 100000000);
|
||||||
|
metricValues.put(ts - 100000, 200000000);
|
||||||
|
metricValues.put(ts - 80000, 300000000);
|
||||||
|
metricValues.put(ts - 60000, 400000000);
|
||||||
|
metricValues.put(ts - 40000, 50000000000L);
|
||||||
|
metricValues.put(ts - 20000, 60000000000L);
|
||||||
|
m1.setType(Type.TIME_SERIES);
|
||||||
|
m1.setValues(metricValues);
|
||||||
|
metrics.add(m1);
|
||||||
|
entity.addMetrics(metrics);
|
||||||
|
|
||||||
|
te.addEntity(entity);
|
||||||
|
|
||||||
|
HBaseTimelineWriterImpl hbi = null;
|
||||||
|
HBaseTimelineReaderImpl hbr = null;
|
||||||
|
try {
|
||||||
|
Configuration c1 = util.getConfiguration();
|
||||||
|
hbi = new HBaseTimelineWriterImpl(c1);
|
||||||
|
hbi.init(c1);
|
||||||
|
hbi.start();
|
||||||
|
hbr = new HBaseTimelineReaderImpl();
|
||||||
|
hbr.init(c1);
|
||||||
|
hbr.start();
|
||||||
|
String cluster = "cluster_test_write_app";
|
||||||
|
String user = "user1";
|
||||||
|
String flow = "some_flow_name";
|
||||||
|
String flowVersion = "AB7822C10F1111";
|
||||||
|
long runid = 1002345678919L;
|
||||||
|
hbi.write(cluster, user, flow, flowVersion, runid, id, te);
|
||||||
|
hbi.stop();
|
||||||
|
|
||||||
|
// retrieve the row
|
||||||
|
byte[] rowKey =
|
||||||
|
ApplicationRowKey.getRowKey(cluster, user, flow, runid, id);
|
||||||
|
Get get = new Get(rowKey);
|
||||||
|
get.setMaxVersions(Integer.MAX_VALUE);
|
||||||
|
Connection conn = ConnectionFactory.createConnection(c1);
|
||||||
|
Result result = new ApplicationTable().getResult(c1, conn, get);
|
||||||
|
|
||||||
|
assertTrue(result != null);
|
||||||
|
assertEquals(16, result.size());
|
||||||
|
|
||||||
|
// check the row key
|
||||||
|
byte[] row1 = result.getRow();
|
||||||
|
assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
|
||||||
|
id));
|
||||||
|
|
||||||
|
// check info column family
|
||||||
|
String id1 = ApplicationColumn.ID.readResult(result).toString();
|
||||||
|
assertEquals(id, id1);
|
||||||
|
|
||||||
|
Number val =
|
||||||
|
(Number) ApplicationColumn.CREATED_TIME.readResult(result);
|
||||||
|
Long cTime1 = val.longValue();
|
||||||
|
assertEquals(cTime1, cTime);
|
||||||
|
|
||||||
|
val = (Number) ApplicationColumn.MODIFIED_TIME.readResult(result);
|
||||||
|
Long mTime1 = val.longValue();
|
||||||
|
assertEquals(mTime1, mTime);
|
||||||
|
|
||||||
|
Map<String, Object> infoColumns =
|
||||||
|
ApplicationColumnPrefix.INFO.readResults(result);
|
||||||
|
assertEquals(infoMap.size(), infoColumns.size());
|
||||||
|
for (String infoItem : infoMap.keySet()) {
|
||||||
|
assertEquals(infoMap.get(infoItem), infoColumns.get(infoItem));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remember isRelatedTo is of type Map<String, Set<String>>
|
||||||
|
for (String isRelatedToKey : isRelatedTo.keySet()) {
|
||||||
|
Object isRelatedToValue =
|
||||||
|
ApplicationColumnPrefix.IS_RELATED_TO.readResult(result,
|
||||||
|
isRelatedToKey);
|
||||||
|
String compoundValue = isRelatedToValue.toString();
|
||||||
|
// id7?id9?id6
|
||||||
|
Set<String> isRelatedToValues =
|
||||||
|
new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
|
||||||
|
assertEquals(isRelatedTo.get(isRelatedToKey).size(),
|
||||||
|
isRelatedToValues.size());
|
||||||
|
for (String v : isRelatedTo.get(isRelatedToKey)) {
|
||||||
|
assertTrue(isRelatedToValues.contains(v));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RelatesTo
|
||||||
|
for (String relatesToKey : relatesTo.keySet()) {
|
||||||
|
String compoundValue =
|
||||||
|
ApplicationColumnPrefix.RELATES_TO.readResult(result,
|
||||||
|
relatesToKey).toString();
|
||||||
|
// id3?id4?id5
|
||||||
|
Set<String> relatesToValues =
|
||||||
|
new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
|
||||||
|
assertEquals(relatesTo.get(relatesToKey).size(),
|
||||||
|
relatesToValues.size());
|
||||||
|
for (String v : relatesTo.get(relatesToKey)) {
|
||||||
|
assertTrue(relatesToValues.contains(v));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Configuration
|
||||||
|
Map<String, Object> configColumns =
|
||||||
|
ApplicationColumnPrefix.CONFIG.readResults(result);
|
||||||
|
assertEquals(conf.size(), configColumns.size());
|
||||||
|
for (String configItem : conf.keySet()) {
|
||||||
|
assertEquals(conf.get(configItem), configColumns.get(configItem));
|
||||||
|
}
|
||||||
|
|
||||||
|
NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
|
||||||
|
ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
|
||||||
|
|
||||||
|
NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
|
||||||
|
// We got metrics back
|
||||||
|
assertNotNull(metricMap);
|
||||||
|
// Same number of metrics as we wrote
|
||||||
|
assertEquals(metricValues.entrySet().size(), metricMap.entrySet().size());
|
||||||
|
|
||||||
|
// Iterate over original metrics and confirm that they are present
|
||||||
|
// here.
|
||||||
|
for (Entry<Long, Number> metricEntry : metricValues.entrySet()) {
|
||||||
|
assertEquals(metricEntry.getValue(),
|
||||||
|
metricMap.get(metricEntry.getKey()));
|
||||||
|
}
|
||||||
|
|
||||||
|
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id,
|
||||||
|
entity.getType(), entity.getId(),
|
||||||
|
EnumSet.of(TimelineReader.Field.ALL));
|
||||||
|
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
|
||||||
|
id, entity.getType(), null, null, null, null, null, null, null,
|
||||||
|
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
||||||
|
assertNotNull(e1);
|
||||||
|
assertEquals(1, es1.size());
|
||||||
|
} finally {
|
||||||
|
if (hbi != null) {
|
||||||
|
hbi.stop();
|
||||||
|
hbi.close();
|
||||||
|
}
|
||||||
|
if (hbr != null) {
|
||||||
|
hbr.stop();
|
||||||
|
hbr.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -154,7 +360,7 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
hbr = new HBaseTimelineReaderImpl();
|
hbr = new HBaseTimelineReaderImpl();
|
||||||
hbr.init(c1);
|
hbr.init(c1);
|
||||||
hbr.start();
|
hbr.start();
|
||||||
String cluster = "cluster1";
|
String cluster = "cluster_test_write_entity";
|
||||||
String user = "user1";
|
String user = "user1";
|
||||||
String flow = "some_flow_name";
|
String flow = "some_flow_name";
|
||||||
String flowVersion = "AB7822C10F1111";
|
String flowVersion = "AB7822C10F1111";
|
||||||
|
@ -268,7 +474,8 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
assertEquals(17, colCount);
|
assertEquals(17, colCount);
|
||||||
|
|
||||||
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
|
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
|
||||||
entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
|
entity.getType(), entity.getId(),
|
||||||
|
EnumSet.of(TimelineReader.Field.ALL));
|
||||||
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
|
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
|
||||||
appName, entity.getType(), null, null, null, null, null, null, null,
|
appName, entity.getType(), null, null, null, null, null, null, null,
|
||||||
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
||||||
|
@ -284,10 +491,6 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
hbr.close();
|
hbr.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Somewhat of a hack, not a separate test in order not to have to deal with
|
|
||||||
// test case order exectution.
|
|
||||||
testAdditionalEntity();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
|
private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
|
||||||
|
@ -299,14 +502,31 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
assertEquals(user, Bytes.toString(rowKeyComponents[0]));
|
assertEquals(user, Bytes.toString(rowKeyComponents[0]));
|
||||||
assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
|
assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
|
||||||
assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
|
assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
|
||||||
assertEquals(TimelineWriterUtils.invert(runid), Bytes.toLong(rowKeyComponents[3]));
|
assertEquals(TimelineWriterUtils.invert(runid),
|
||||||
|
Bytes.toLong(rowKeyComponents[3]));
|
||||||
assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
|
assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
|
||||||
assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
|
assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
|
||||||
assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
|
assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testAdditionalEntity() throws IOException {
|
private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster,
|
||||||
|
String user, String flow, Long runid, String appName) {
|
||||||
|
|
||||||
|
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
|
||||||
|
|
||||||
|
assertTrue(rowKeyComponents.length == 5);
|
||||||
|
assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
|
||||||
|
assertEquals(user, Bytes.toString(rowKeyComponents[1]));
|
||||||
|
assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
|
||||||
|
assertEquals(TimelineWriterUtils.invert(runid),
|
||||||
|
Bytes.toLong(rowKeyComponents[3]));
|
||||||
|
assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEvents() throws IOException {
|
||||||
TimelineEvent event = new TimelineEvent();
|
TimelineEvent event = new TimelineEvent();
|
||||||
String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE;
|
String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE;
|
||||||
event.setId(eventId);
|
event.setId(eventId);
|
||||||
|
@ -333,7 +553,7 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
hbr = new HBaseTimelineReaderImpl();
|
hbr = new HBaseTimelineReaderImpl();
|
||||||
hbr.init(c1);
|
hbr.init(c1);
|
||||||
hbr.start();
|
hbr.start();
|
||||||
String cluster = "cluster2";
|
String cluster = "cluster_test_events";
|
||||||
String user = "user2";
|
String user = "user2";
|
||||||
String flow = "other_flow_name";
|
String flow = "other_flow_name";
|
||||||
String flowVersion = "1111F01C2287BA";
|
String flowVersion = "1111F01C2287BA";
|
||||||
|
@ -341,50 +561,46 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
String appName = "some app name";
|
String appName = "some app name";
|
||||||
hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
|
hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
|
||||||
hbi.stop();
|
hbi.stop();
|
||||||
// scan the table and see that entity exists
|
|
||||||
Scan s = new Scan();
|
// retrieve the row
|
||||||
byte[] startRow =
|
byte[] rowKey =
|
||||||
EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
|
ApplicationRowKey.getRowKey(cluster, user, flow, runid, appName);
|
||||||
s.setStartRow(startRow);
|
Get get = new Get(rowKey);
|
||||||
|
get.setMaxVersions(Integer.MAX_VALUE);
|
||||||
Connection conn = ConnectionFactory.createConnection(c1);
|
Connection conn = ConnectionFactory.createConnection(c1);
|
||||||
ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
|
Result result = new ApplicationTable().getResult(c1, conn, get);
|
||||||
|
|
||||||
int rowCount = 0;
|
assertTrue(result != null);
|
||||||
for (Result result : scanner) {
|
|
||||||
if (result != null && !result.isEmpty()) {
|
|
||||||
rowCount++;
|
|
||||||
|
|
||||||
// check the row key
|
// check the row key
|
||||||
byte[] row1 = result.getRow();
|
byte[] row1 = result.getRow();
|
||||||
assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
|
assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
|
||||||
entity));
|
appName));
|
||||||
|
|
||||||
Map<String, Object> eventsResult =
|
Map<String, Object> eventsResult =
|
||||||
EntityColumnPrefix.EVENT.readResults(result);
|
ApplicationColumnPrefix.EVENT.readResults(result);
|
||||||
// there should be only one event
|
// there should be only one event
|
||||||
assertEquals(1, eventsResult.size());
|
assertEquals(1, eventsResult.size());
|
||||||
// key name for the event
|
// key name for the event
|
||||||
byte[] compoundColumnQualifierBytes =
|
byte[] compoundColumnQualifierBytes =
|
||||||
Separator.VALUES.join(Bytes.toBytes(eventId),
|
Separator.VALUES.join(Bytes.toBytes(eventId),
|
||||||
Bytes.toBytes(TimelineWriterUtils.invert(expTs)),
|
Bytes.toBytes(TimelineWriterUtils.invert(expTs)),
|
||||||
Bytes.toBytes(expKey));
|
Bytes.toBytes(expKey));
|
||||||
String valueKey = Bytes.toString(compoundColumnQualifierBytes);
|
String valueKey = Bytes.toString(compoundColumnQualifierBytes);
|
||||||
for (Map.Entry<String, Object> e :
|
for (Map.Entry<String, Object> e : eventsResult.entrySet()) {
|
||||||
eventsResult.entrySet()) {
|
// the value key must match
|
||||||
// the value key must match
|
assertEquals(valueKey, e.getKey());
|
||||||
assertEquals(valueKey, e.getKey());
|
Object value = e.getValue();
|
||||||
Object value = e.getValue();
|
// there should be only one timestamp and value
|
||||||
// there should be only one timestamp and value
|
assertEquals(expVal, value.toString());
|
||||||
assertEquals(expVal, value.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
assertEquals(1, rowCount);
|
|
||||||
|
|
||||||
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
|
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
|
||||||
entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
|
entity.getType(), entity.getId(),
|
||||||
|
EnumSet.of(TimelineReader.Field.ALL));
|
||||||
TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName,
|
TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName,
|
||||||
entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
|
entity.getType(), entity.getId(),
|
||||||
|
EnumSet.of(TimelineReader.Field.ALL));
|
||||||
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
|
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
|
||||||
appName, entity.getType(), null, null, null, null, null, null, null,
|
appName, entity.getType(), null, null, null, null, null, null, null,
|
||||||
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
||||||
|
@ -410,7 +626,7 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAdditionalEntityEmptyEventInfo() throws IOException {
|
public void testEventsWithEmptyInfo() throws IOException {
|
||||||
TimelineEvent event = new TimelineEvent();
|
TimelineEvent event = new TimelineEvent();
|
||||||
String eventId = "foo_event_id";
|
String eventId = "foo_event_id";
|
||||||
event.setId(eventId);
|
event.setId(eventId);
|
||||||
|
@ -435,7 +651,7 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
hbr = new HBaseTimelineReaderImpl();
|
hbr = new HBaseTimelineReaderImpl();
|
||||||
hbr.init(c1);
|
hbr.init(c1);
|
||||||
hbr.start();
|
hbr.start();
|
||||||
String cluster = "cluster_emptyeventkey";
|
String cluster = "cluster_test_empty_eventkey";
|
||||||
String user = "user_emptyeventkey";
|
String user = "user_emptyeventkey";
|
||||||
String flow = "other_flow_name";
|
String flow = "other_flow_name";
|
||||||
String flowVersion = "1111F01C2287BA";
|
String flowVersion = "1111F01C2287BA";
|
||||||
|
@ -487,7 +703,8 @@ public class TestHBaseTimelineWriterImpl {
|
||||||
assertEquals(1, rowCount);
|
assertEquals(1, rowCount);
|
||||||
|
|
||||||
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
|
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
|
||||||
entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
|
entity.getType(), entity.getId(),
|
||||||
|
EnumSet.of(TimelineReader.Field.ALL));
|
||||||
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
|
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
|
||||||
appName, entity.getType(), null, null, null, null, null, null, null,
|
appName, entity.getType(), null, null, null, null, null, null, null,
|
||||||
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
||||||
|
|
Loading…
Reference in New Issue