YARN-4074. [timeline reader] implement support for querying for flows and flow runs (sjlee via vrushali)

This commit is contained in:
Vrushali 2015-09-22 13:42:30 -07:00 committed by Sangjin Lee
parent a68e383921
commit 10fa6da7d8
24 changed files with 1930 additions and 592 deletions

View File

@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.timelineservice;
import java.util.Collection;
import java.util.Date;
import java.util.NavigableSet;
import java.util.TreeSet;
import javax.xml.bind.annotation.XmlElement;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Entity that represents a record for flow activity. It's essentially a
* container entity for flow runs with limited information.
*/
@Public
@Unstable
public class FlowActivityEntity extends TimelineEntity {
public static final String CLUSTER_INFO_KEY =
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "CLUSTER";
public static final String DATE_INFO_KEY =
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "DATE";
public static final String USER_INFO_KEY =
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER";
public static final String FLOW_NAME_INFO_KEY =
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_NAME";
private final NavigableSet<FlowRunEntity> flowRuns = new TreeSet<>();
public FlowActivityEntity() {
super(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
// set config to null
setConfigs(null);
}
public FlowActivityEntity(String cluster, long time, String user,
String flowName) {
this();
setCluster(cluster);
setDate(time);
setUser(user);
setFlowName(flowName);
}
public FlowActivityEntity(TimelineEntity entity) {
super(entity);
if (!TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entity.getType())) {
throw new IllegalArgumentException("Incompatible entity type: " +
getId());
}
// set config to null
setConfigs(null);
}
@XmlElement(name = "id")
@Override
public String getId() {
// flow activity: cluster/day/user@flow_name
String id = super.getId();
if (id == null) {
StringBuilder sb = new StringBuilder();
sb.append(getCluster());
sb.append('/');
sb.append(getDate().getTime());
sb.append('/');
sb.append(getUser());
sb.append('@');
sb.append(getFlowName());
id = sb.toString();
setId(id);
}
return id;
}
@Override
public int compareTo(TimelineEntity entity) {
int comparison = getType().compareTo(entity.getType());
if (comparison == 0) {
// order by cluster, date (descending), user, and flow name
FlowActivityEntity other = (FlowActivityEntity)entity;
int clusterComparison = getCluster().compareTo(other.getCluster());
if (clusterComparison != 0) {
return clusterComparison;
}
int dateComparisonDescending =
(int)(other.getDate().getTime() - getDate().getTime()); // descending
if (dateComparisonDescending != 0) {
return dateComparisonDescending; // descending
}
int userComparison = getUser().compareTo(other.getUser());
if (userComparison != 0) {
return userComparison;
}
return getFlowName().compareTo(other.getFlowName());
} else {
return comparison;
}
}
/**
* Reuse the base class equals method.
*/
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
/**
* Reuse the base class hashCode method.
*/
@Override
public int hashCode() {
return super.hashCode();
}
public String getCluster() {
return (String)getInfo().get(CLUSTER_INFO_KEY);
}
public void setCluster(String cluster) {
addInfo(CLUSTER_INFO_KEY, cluster);
}
public Date getDate() {
return (Date)getInfo().get(DATE_INFO_KEY);
}
public void setDate(long time) {
Date date = new Date(time);
addInfo(DATE_INFO_KEY, date);
}
public String getUser() {
return (String)getInfo().get(USER_INFO_KEY);
}
public void setUser(String user) {
addInfo(USER_INFO_KEY, user);
}
public String getFlowName() {
return (String)getInfo().get(FLOW_NAME_INFO_KEY);
}
public void setFlowName(String flowName) {
addInfo(FLOW_NAME_INFO_KEY, flowName);
}
public void addFlowRun(FlowRunEntity run) {
flowRuns.add(run);
}
public void addFlowRuns(Collection<FlowRunEntity> runs) {
flowRuns.addAll(runs);
}
@XmlElement(name = "flowruns")
public NavigableSet<FlowRunEntity> getFlowRuns() {
return flowRuns;
}
public int getNumberOfRuns() {
return flowRuns.size();
}
}

View File

@ -17,14 +17,14 @@
*/ */
package org.apache.hadoop.yarn.api.records.timelineservice; package org.apache.hadoop.yarn.api.records.timelineservice;
import javax.xml.bind.annotation.XmlElement;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import javax.xml.bind.annotation.XmlElement;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class FlowEntity extends HierarchicalTimelineEntity { public class FlowRunEntity extends HierarchicalTimelineEntity {
public static final String USER_INFO_KEY = public static final String USER_INFO_KEY =
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER"; TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER";
public static final String FLOW_NAME_INFO_KEY = public static final String FLOW_NAME_INFO_KEY =
@ -33,22 +33,28 @@ public class FlowEntity extends HierarchicalTimelineEntity {
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_VERSION"; TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_VERSION";
public static final String FLOW_RUN_ID_INFO_KEY = public static final String FLOW_RUN_ID_INFO_KEY =
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_ID"; TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_ID";
public static final String FLOW_RUN_END_TIME =
TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_END_TIME";
public FlowEntity() { public FlowRunEntity() {
super(TimelineEntityType.YARN_FLOW.toString()); super(TimelineEntityType.YARN_FLOW_RUN.toString());
// set config to null
setConfigs(null);
} }
public FlowEntity(TimelineEntity entity) { public FlowRunEntity(TimelineEntity entity) {
super(entity); super(entity);
if (!entity.getType().equals(TimelineEntityType.YARN_FLOW.toString())) { if (!entity.getType().equals(TimelineEntityType.YARN_FLOW_RUN.toString())) {
throw new IllegalArgumentException("Incompatible entity type: " + getId()); throw new IllegalArgumentException("Incompatible entity type: " + getId());
} }
// set config to null
setConfigs(null);
} }
@XmlElement(name = "id") @XmlElement(name = "id")
@Override @Override
public String getId() { public String getId() {
//Flow id schema: user@flow_name(or id)/version/run_id //Flow id schema: user@flow_name(or id)/run_id
String id = super.getId(); String id = super.getId();
if (id == null) { if (id == null) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
@ -56,8 +62,6 @@ public class FlowEntity extends HierarchicalTimelineEntity {
sb.append('@'); sb.append('@');
sb.append(getInfo().get(FLOW_NAME_INFO_KEY).toString()); sb.append(getInfo().get(FLOW_NAME_INFO_KEY).toString());
sb.append('/'); sb.append('/');
sb.append(getInfo().get(FLOW_VERSION_INFO_KEY).toString());
sb.append('/');
sb.append(getInfo().get(FLOW_RUN_ID_INFO_KEY).toString()); sb.append(getInfo().get(FLOW_RUN_ID_INFO_KEY).toString());
id = sb.toString(); id = sb.toString();
setId(id); setId(id);
@ -66,8 +70,7 @@ public class FlowEntity extends HierarchicalTimelineEntity {
} }
public String getUser() { public String getUser() {
Object user = getInfo().get(USER_INFO_KEY); return (String)getInfo().get(USER_INFO_KEY);
return user == null ? null : user.toString();
} }
public void setUser(String user) { public void setUser(String user) {
@ -75,8 +78,7 @@ public class FlowEntity extends HierarchicalTimelineEntity {
} }
public String getName() { public String getName() {
Object name = getInfo().get(FLOW_NAME_INFO_KEY); return (String)getInfo().get(FLOW_NAME_INFO_KEY);
return name == null ? null : name.toString();
} }
public void setName(String name) { public void setName(String name) {
@ -84,8 +86,7 @@ public class FlowEntity extends HierarchicalTimelineEntity {
} }
public String getVersion() { public String getVersion() {
Object version = getInfo().get(FLOW_VERSION_INFO_KEY); return (String)getInfo().get(FLOW_VERSION_INFO_KEY);
return version == null ? null : version.toString();
} }
public void setVersion(String version) { public void setVersion(String version) {
@ -100,4 +101,21 @@ public class FlowEntity extends HierarchicalTimelineEntity {
public void setRunId(long runId) { public void setRunId(long runId) {
addInfo(FLOW_RUN_ID_INFO_KEY, runId); addInfo(FLOW_RUN_ID_INFO_KEY, runId);
} }
public long getStartTime() {
return getCreatedTime();
}
public void setStartTime(long startTime) {
setCreatedTime(startTime);
}
public long getMaxEndTime() {
Object time = getInfo().get(FLOW_RUN_END_TIME);
return time == null ? 0L : (Long)time;
}
public void setMaxEndTime(long endTime) {
addInfo(FLOW_RUN_END_TIME, endTime);
}
} }

View File

@ -24,21 +24,25 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable @InterfaceStability.Unstable
public enum TimelineEntityType { public enum TimelineEntityType {
YARN_CLUSTER, YARN_CLUSTER,
YARN_FLOW, YARN_FLOW_RUN,
YARN_APPLICATION, YARN_APPLICATION,
YARN_APPLICATION_ATTEMPT, YARN_APPLICATION_ATTEMPT,
YARN_CONTAINER, YARN_CONTAINER,
YARN_USER, YARN_USER,
YARN_QUEUE; YARN_QUEUE,
YARN_FLOW_ACTIVITY;
/**
* Whether the input type can be a parent of this entity.
*/
public boolean isParent(TimelineEntityType type) { public boolean isParent(TimelineEntityType type) {
switch (this) { switch (this) {
case YARN_CLUSTER: case YARN_CLUSTER:
return false; return false;
case YARN_FLOW: case YARN_FLOW_RUN:
return YARN_FLOW == type || YARN_CLUSTER == type; return YARN_FLOW_RUN == type || YARN_CLUSTER == type;
case YARN_APPLICATION: case YARN_APPLICATION:
return YARN_FLOW == type || YARN_CLUSTER == type; return YARN_FLOW_RUN == type || YARN_CLUSTER == type;
case YARN_APPLICATION_ATTEMPT: case YARN_APPLICATION_ATTEMPT:
return YARN_APPLICATION == type; return YARN_APPLICATION == type;
case YARN_CONTAINER: case YARN_CONTAINER:
@ -50,12 +54,15 @@ public enum TimelineEntityType {
} }
} }
/**
* Whether the input type can be a child of this entity.
*/
public boolean isChild(TimelineEntityType type) { public boolean isChild(TimelineEntityType type) {
switch (this) { switch (this) {
case YARN_CLUSTER: case YARN_CLUSTER:
return YARN_FLOW == type || YARN_APPLICATION == type; return YARN_FLOW_RUN == type || YARN_APPLICATION == type;
case YARN_FLOW: case YARN_FLOW_RUN:
return YARN_FLOW == type || YARN_APPLICATION == type; return YARN_FLOW_RUN == type || YARN_APPLICATION == type;
case YARN_APPLICATION: case YARN_APPLICATION:
return YARN_APPLICATION_ATTEMPT == type; return YARN_APPLICATION_ATTEMPT == type;
case YARN_APPLICATION_ATTEMPT: case YARN_APPLICATION_ATTEMPT:
@ -68,4 +75,12 @@ public enum TimelineEntityType {
return false; return false;
} }
} }
/**
* Whether the type of this entity matches the type indicated by the input
* argument.
*/
public boolean matches(String typeString) {
return toString().equals(typeString);
}
} }

View File

@ -182,14 +182,14 @@ public class TestTimelineServiceRecords {
ClusterEntity cluster = new ClusterEntity(); ClusterEntity cluster = new ClusterEntity();
cluster.setId("test cluster id"); cluster.setId("test cluster id");
FlowEntity flow1 = new FlowEntity(); FlowRunEntity flow1 = new FlowRunEntity();
//flow1.setId("test flow id 1"); //flow1.setId("test flow id 1");
flow1.setUser(user.getId()); flow1.setUser(user.getId());
flow1.setName("test flow name 1"); flow1.setName("test flow name 1");
flow1.setVersion("test flow version 1"); flow1.setVersion("test flow version 1");
flow1.setRunId(1L); flow1.setRunId(1L);
FlowEntity flow2 = new FlowEntity(); FlowRunEntity flow2 = new FlowRunEntity();
//flow2.setId("test flow run id 2"); //flow2.setId("test flow run id 2");
flow2.setUser(user.getId()); flow2.setUser(user.getId());
flow2.setName("test flow name 2"); flow2.setName("test flow name 2");
@ -213,19 +213,19 @@ public class TestTimelineServiceRecords {
ApplicationAttemptId.newInstance( ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1), 1).toString()); ApplicationId.newInstance(0, 1), 1), 1).toString());
cluster.addChild(TimelineEntityType.YARN_FLOW.toString(), flow1.getId()); cluster.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), flow1.getId());
flow1 flow1
.setParent(TimelineEntityType.YARN_CLUSTER.toString(), cluster.getId()); .setParent(TimelineEntityType.YARN_CLUSTER.toString(), cluster.getId());
flow1.addChild(TimelineEntityType.YARN_FLOW.toString(), flow2.getId()); flow1.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
flow2.setParent(TimelineEntityType.YARN_FLOW.toString(), flow1.getId()); flow2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow1.getId());
flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), app1.getId()); flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), app1.getId());
flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), app2.getId()); flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), app2.getId());
app1.setParent(TimelineEntityType.YARN_FLOW.toString(), flow2.getId()); app1.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
app1.addChild(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), app1.addChild(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
appAttempt.getId()); appAttempt.getId());
appAttempt appAttempt
.setParent(TimelineEntityType.YARN_APPLICATION.toString(), app1.getId()); .setParent(TimelineEntityType.YARN_APPLICATION.toString(), app1.getId());
app2.setParent(TimelineEntityType.YARN_FLOW.toString(), flow2.getId()); app2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
appAttempt.addChild(TimelineEntityType.YARN_CONTAINER.toString(), appAttempt.addChild(TimelineEntityType.YARN_CONTAINER.toString(),
container.getId()); container.getId());
container.setParent(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), container.setParent(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),

View File

@ -107,7 +107,7 @@ public class TestTimelineServiceClientIntegration {
client.start(); client.start();
ClusterEntity cluster = new ClusterEntity(); ClusterEntity cluster = new ClusterEntity();
cluster.setId(YarnConfiguration.DEFAULT_RM_CLUSTER_ID); cluster.setId(YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
FlowEntity flow = new FlowEntity(); FlowRunEntity flow = new FlowRunEntity();
flow.setUser(UserGroupInformation.getCurrentUser().getShortUserName()); flow.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
flow.setName("test_flow_name"); flow.setName("test_flow_name");
flow.setVersion("test_flow_version"); flow.setVersion("test_flow_version");

View File

@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEnti
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.ClusterEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ClusterEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity; import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@ -205,8 +205,8 @@ public class TimelineCollectorWebService {
case YARN_CLUSTER: case YARN_CLUSTER:
entitiesToReturn.addEntity(new ClusterEntity(entity)); entitiesToReturn.addEntity(new ClusterEntity(entity));
break; break;
case YARN_FLOW: case YARN_FLOW_RUN:
entitiesToReturn.addEntity(new FlowEntity(entity)); entitiesToReturn.addEntity(new FlowRunEntity(entity));
break; break;
case YARN_APPLICATION: case YARN_APPLICATION:
entitiesToReturn.addEntity(new ApplicationEntity(entity)); entitiesToReturn.addEntity(new ApplicationEntity(entity));

View File

@ -0,0 +1,229 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
/**
* Timeline entity reader for application entities that are stored in the
* application table.
*/
class ApplicationEntityReader extends GenericEntityReader {
private static final ApplicationTable APPLICATION_TABLE =
new ApplicationTable();
public ApplicationEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
Long limit, Long createdTimeBegin, Long createdTimeEnd,
Long modifiedTimeBegin, Long modifiedTimeEnd,
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
eventFilters, fieldsToRetrieve);
}
public ApplicationEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
fieldsToRetrieve);
}
/**
* Uses the {@link ApplicationTable}.
*/
protected BaseTable<?> getTable() {
return APPLICATION_TABLE;
}
@Override
protected Result getResult(Configuration hbaseConf, Connection conn)
throws IOException {
byte[] rowKey =
ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId,
appId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
return table.getResult(hbaseConf, conn, get);
}
@Override
protected Iterable<Result> getResults(Configuration hbaseConf,
Connection conn) throws IOException {
// If getEntities() is called for an application, there can be at most
// one entity. If the entity passes the filter, it is returned. Otherwise,
// an empty set is returned.
byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId,
flowRunId, appId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
Result result = table.getResult(hbaseConf, conn, get);
TimelineEntity entity = parseEntity(result);
Set<Result> set;
if (entity != null) {
set = Collections.singleton(result);
} else {
set = Collections.emptySet();
}
return set;
}
@Override
protected TimelineEntity parseEntity(Result result) throws IOException {
if (result == null || result.isEmpty()) {
return null;
}
TimelineEntity entity = new TimelineEntity();
entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
String entityId = ApplicationColumn.ID.readResult(result).toString();
entity.setId(entityId);
// fetch created time
Number createdTime =
(Number)ApplicationColumn.CREATED_TIME.readResult(result);
entity.setCreatedTime(createdTime.longValue());
if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
entity.getCreatedTime() > createdTimeEnd)) {
return null;
}
// fetch modified time
Number modifiedTime =
(Number)ApplicationColumn.MODIFIED_TIME.readResult(result);
entity.setModifiedTime(modifiedTime.longValue());
if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
entity.getModifiedTime() > modifiedTimeEnd)) {
return null;
}
// fetch is related to entities
boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
true);
if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
entity.getIsRelatedToEntities(), isRelatedTo)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
entity.getIsRelatedToEntities().clear();
}
}
// fetch relates to entities
boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
false);
if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
entity.getRelatesToEntities(), relatesTo)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.RELATES_TO)) {
entity.getRelatesToEntities().clear();
}
}
// fetch info
boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
if (checkInfo &&
!TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.INFO)) {
entity.getInfo().clear();
}
}
// fetch configs
boolean checkConfigs = configFilters != null && configFilters.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
if (checkConfigs && !TimelineReaderUtils.matchFilters(
entity.getConfigs(), configFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.CONFIGS)) {
entity.getConfigs().clear();
}
}
// fetch events
boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
readEvents(entity, result, true);
if (checkEvents && !TimelineReaderUtils.matchEventFilters(
entity.getEvents(), eventFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.EVENTS)) {
entity.getEvents().clear();
}
}
// fetch metrics
boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
entity.getMetrics(), metricFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.METRICS)) {
entity.getMetrics().clear();
}
}
return entity;
}
}

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
import com.google.common.base.Preconditions;
/**
* Timeline entity reader for flow activity entities that are stored in the
* flow activity table.
*/
class FlowActivityEntityReader extends TimelineEntityReader {
private static final FlowActivityTable FLOW_ACTIVITY_TABLE =
new FlowActivityTable();
public FlowActivityEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
Long limit, Long createdTimeBegin, Long createdTimeEnd,
Long modifiedTimeBegin, Long modifiedTimeEnd,
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
eventFilters, fieldsToRetrieve);
}
public FlowActivityEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
fieldsToRetrieve);
}
/**
* Uses the {@link FlowActivityTable}.
*/
@Override
protected BaseTable<?> getTable() {
return FLOW_ACTIVITY_TABLE;
}
/**
* Since this is strictly sorted by the row key, it is sufficient to collect
* the first results as specified by the limit.
*/
@Override
public Set<TimelineEntity> readEntities(Configuration hbaseConf,
Connection conn) throws IOException {
validateParams();
augmentParams(hbaseConf, conn);
NavigableSet<TimelineEntity> entities = new TreeSet<>();
Iterable<Result> results = getResults(hbaseConf, conn);
for (Result result : results) {
TimelineEntity entity = parseEntity(result);
if (entity == null) {
continue;
}
entities.add(entity);
if (entities.size() == limit) {
break;
}
}
return entities;
}
@Override
protected void validateParams() {
Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
}
@Override
protected void augmentParams(Configuration hbaseConf, Connection conn)
throws IOException {
if (limit == null || limit < 0) {
limit = TimelineReader.DEFAULT_LIMIT;
}
}
@Override
protected Result getResult(Configuration hbaseConf, Connection conn)
throws IOException {
throw new UnsupportedOperationException(
"we don't support a single entity query");
}
@Override
protected Iterable<Result> getResults(Configuration hbaseConf,
Connection conn) throws IOException {
Scan scan = new Scan();
scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId));
// use the page filter to limit the result to the page size
// the scanner may still return more than the limit; therefore we need to
// read the right number as we iterate
scan.setFilter(new PageFilter(limit));
return table.getResultScanner(hbaseConf, conn, scan);
}
@Override
protected TimelineEntity parseEntity(Result result) throws IOException {
FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow());
long time = rowKey.getDayTimestamp();
String user = rowKey.getUserId();
String flowName = rowKey.getFlowId();
FlowActivityEntity flowActivity =
new FlowActivityEntity(clusterId, time, user, flowName);
// set the id
flowActivity.setId(flowActivity.getId());
// get the list of run ids along with the version that are associated with
// this flow on this day
Map<String, Object> runIdsMap =
FlowActivityColumnPrefix.RUN_ID.readResults(result);
for (Map.Entry<String, Object> e : runIdsMap.entrySet()) {
Long runId = Long.valueOf(e.getKey());
String version = (String)e.getValue();
FlowRunEntity flowRun = new FlowRunEntity();
flowRun.setUser(user);
flowRun.setName(flowName);
flowRun.setRunId(runId);
flowRun.setVersion(version);
// set the id
flowRun.setId(flowRun.getId());
flowActivity.addFlowRun(flowRun);
}
return flowActivity;
}
}

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
import com.google.common.base.Preconditions;
/**
* Timeline entity reader for flow run entities that are stored in the flow run
* table.
*/
class FlowRunEntityReader extends TimelineEntityReader {
private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable();
public FlowRunEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
Long limit, Long createdTimeBegin, Long createdTimeEnd,
Long modifiedTimeBegin, Long modifiedTimeEnd,
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
eventFilters, fieldsToRetrieve);
}
public FlowRunEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
fieldsToRetrieve);
}
/**
* Uses the {@link FlowRunTable}.
*/
@Override
protected BaseTable<?> getTable() {
return FLOW_RUN_TABLE;
}
@Override
protected void validateParams() {
Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
Preconditions.checkNotNull(userId, "userId shouldn't be null");
Preconditions.checkNotNull(flowId, "flowId shouldn't be null");
Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null");
}
@Override
protected void augmentParams(Configuration hbaseConf, Connection conn) {
}
@Override
protected Result getResult(Configuration hbaseConf, Connection conn)
throws IOException {
byte[] rowKey =
FlowRunRowKey.getRowKey(clusterId, userId, flowId, flowRunId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
return table.getResult(hbaseConf, conn, get);
}
@Override
protected Iterable<Result> getResults(Configuration hbaseConf,
Connection conn) throws IOException {
throw new UnsupportedOperationException(
"multiple entity query is not supported");
}
@Override
protected TimelineEntity parseEntity(Result result) throws IOException {
FlowRunEntity flowRun = new FlowRunEntity();
flowRun.setUser(userId);
flowRun.setName(flowId);
flowRun.setRunId(flowRunId);
// read the start time
Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result);
if (startTime != null) {
flowRun.setStartTime(startTime);
}
// read the end time if available
Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result);
if (endTime != null) {
flowRun.setMaxEndTime(endTime);
}
// read the flow version
String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result);
if (version != null) {
flowRun.setVersion(version);
}
// read metrics
readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC);
// set the id
flowRun.setId(flowRun.getId());
return flowRun;
}
}

View File

@ -0,0 +1,389 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import com.google.common.base.Preconditions;
/**
* Timeline entity reader for generic entities that are stored in the entity
* table.
*/
class GenericEntityReader extends TimelineEntityReader {
private static final EntityTable ENTITY_TABLE = new EntityTable();
private static final Log LOG = LogFactory.getLog(GenericEntityReader.class);
private static final long DEFAULT_BEGIN_TIME = 0L;
private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
/**
* Used to look up the flow context.
*/
private final AppToFlowTable appToFlowTable = new AppToFlowTable();
public GenericEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
Long limit, Long createdTimeBegin, Long createdTimeEnd,
Long modifiedTimeBegin, Long modifiedTimeEnd,
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
eventFilters, fieldsToRetrieve);
}
public GenericEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
fieldsToRetrieve);
}
/**
* Uses the {@link EntityTable}.
*/
protected BaseTable<?> getTable() {
return ENTITY_TABLE;
}
private FlowContext lookupFlowContext(String clusterId, String appId,
Configuration hbaseConf, Connection conn) throws IOException {
byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
Get get = new Get(rowKey);
Result result = appToFlowTable.getResult(hbaseConf, conn, get);
if (result != null && !result.isEmpty()) {
return new FlowContext(
AppToFlowColumn.FLOW_ID.readResult(result).toString(),
((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
} else {
throw new IOException(
"Unable to find the context flow ID and flow run ID for clusterId=" +
clusterId + ", appId=" + appId);
}
}
private static class FlowContext {
private final String flowId;
private final Long flowRunId;
public FlowContext(String flowId, Long flowRunId) {
this.flowId = flowId;
this.flowRunId = flowRunId;
}
}
@Override
protected void validateParams() {
Preconditions.checkNotNull(userId, "userId shouldn't be null");
Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
Preconditions.checkNotNull(appId, "appId shouldn't be null");
Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
if (singleEntityRead) {
Preconditions.checkNotNull(entityId, "entityId shouldn't be null");
}
}
@Override
protected void augmentParams(Configuration hbaseConf, Connection conn)
throws IOException {
// In reality both should be null or neither should be null
if (flowId == null || flowRunId == null) {
FlowContext context =
lookupFlowContext(clusterId, appId, hbaseConf, conn);
flowId = context.flowId;
flowRunId = context.flowRunId;
}
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.noneOf(Field.class);
}
if (!singleEntityRead) {
if (limit == null || limit < 0) {
limit = TimelineReader.DEFAULT_LIMIT;
}
if (createdTimeBegin == null) {
createdTimeBegin = DEFAULT_BEGIN_TIME;
}
if (createdTimeEnd == null) {
createdTimeEnd = DEFAULT_END_TIME;
}
if (modifiedTimeBegin == null) {
modifiedTimeBegin = DEFAULT_BEGIN_TIME;
}
if (modifiedTimeEnd == null) {
modifiedTimeEnd = DEFAULT_END_TIME;
}
}
}
@Override
protected Result getResult(Configuration hbaseConf, Connection conn)
throws IOException {
byte[] rowKey =
EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId,
entityType, entityId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
return table.getResult(hbaseConf, conn, get);
}
@Override
protected Iterable<Result> getResults(Configuration hbaseConf,
Connection conn) throws IOException {
// Scan through part of the table to find the entities belong to one app
// and one type
Scan scan = new Scan();
scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
clusterId, userId, flowId, flowRunId, appId, entityType));
scan.setMaxVersions(Integer.MAX_VALUE);
return table.getResultScanner(hbaseConf, conn, scan);
}
@Override
protected TimelineEntity parseEntity(Result result) throws IOException {
if (result == null || result.isEmpty()) {
return null;
}
TimelineEntity entity = new TimelineEntity();
String entityType = EntityColumn.TYPE.readResult(result).toString();
entity.setType(entityType);
String entityId = EntityColumn.ID.readResult(result).toString();
entity.setId(entityId);
// fetch created time
Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result);
entity.setCreatedTime(createdTime.longValue());
if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
entity.getCreatedTime() > createdTimeEnd)) {
return null;
}
// fetch modified time
Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result);
entity.setModifiedTime(modifiedTime.longValue());
if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
entity.getModifiedTime() > modifiedTimeEnd)) {
return null;
}
// fetch is related to entities
boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
entity.getIsRelatedToEntities(), isRelatedTo)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
entity.getIsRelatedToEntities().clear();
}
}
// fetch relates to entities
boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
entity.getRelatesToEntities(), relatesTo)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.RELATES_TO)) {
entity.getRelatesToEntities().clear();
}
}
// fetch info
boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
if (checkInfo &&
!TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.INFO)) {
entity.getInfo().clear();
}
}
// fetch configs
boolean checkConfigs = configFilters != null && configFilters.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
if (checkConfigs && !TimelineReaderUtils.matchFilters(
entity.getConfigs(), configFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.CONFIGS)) {
entity.getConfigs().clear();
}
}
// fetch events
boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
readEvents(entity, result, false);
if (checkEvents && !TimelineReaderUtils.matchEventFilters(
entity.getEvents(), eventFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.EVENTS)) {
entity.getEvents().clear();
}
}
// fetch metrics
boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
readMetrics(entity, result, EntityColumnPrefix.METRIC);
if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
entity.getMetrics(), metricFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.METRICS)) {
entity.getMetrics().clear();
}
}
return entity;
}
/**
* Helper method for reading relationship.
*/
protected <T> void readRelationship(
TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
boolean isRelatedTo) throws IOException {
// isRelatedTo and relatesTo are of type Map<String, Set<String>>
Map<String, Object> columns = prefix.readResults(result);
for (Map.Entry<String, Object> column : columns.entrySet()) {
for (String id : Separator.VALUES.splitEncoded(
column.getValue().toString())) {
if (isRelatedTo) {
entity.addIsRelatedToEntity(column.getKey(), id);
} else {
entity.addRelatesToEntity(column.getKey(), id);
}
}
}
}
/**
* Helper method for reading key-value pairs for either info or config.
*/
protected <T> void readKeyValuePairs(
TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
boolean isConfig) throws IOException {
// info and configuration are of type Map<String, Object or String>
Map<String, Object> columns = prefix.readResults(result);
if (isConfig) {
for (Map.Entry<String, Object> column : columns.entrySet()) {
entity.addConfig(column.getKey(), column.getValue().toString());
}
} else {
entity.addInfo(columns);
}
}
/**
* Read events from the entity table or the application table. The column name
* is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
* if there is no info associated with the event.
*
* See {@link EntityTable} and {@link ApplicationTable} for a more detailed
* schema description.
*/
protected void readEvents(TimelineEntity entity, Result result,
boolean isApplication) throws IOException {
Map<String, TimelineEvent> eventsMap = new HashMap<>();
Map<?, Object> eventsResult = isApplication ?
ApplicationColumnPrefix.EVENT.
readResultsHavingCompoundColumnQualifiers(result) :
EntityColumnPrefix.EVENT.
readResultsHavingCompoundColumnQualifiers(result);
for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
byte[][] karr = (byte[][])eventResult.getKey();
// the column name is of the form "eventId=timestamp=infoKey"
if (karr.length == 3) {
String id = Bytes.toString(karr[0]);
long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1]));
String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
TimelineEvent event = eventsMap.get(key);
if (event == null) {
event = new TimelineEvent();
event.setId(id);
event.setTimestamp(ts);
eventsMap.put(key, event);
}
// handle empty info
String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
if (infoKey != null) {
event.addInfo(infoKey, eventResult.getValue());
}
} else {
LOG.warn("incorrectly formatted column name: it will be discarded");
continue;
}
}
Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
entity.addEvents(eventsSet);
}
}

View File

@ -20,13 +20,8 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException; import java.io.IOException;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -34,47 +29,17 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import com.google.common.base.Preconditions;
public class HBaseTimelineReaderImpl public class HBaseTimelineReaderImpl
extends AbstractService implements TimelineReader { extends AbstractService implements TimelineReader {
private static final Log LOG = LogFactory private static final Log LOG = LogFactory
.getLog(HBaseTimelineReaderImpl.class); .getLog(HBaseTimelineReaderImpl.class);
private static final long DEFAULT_BEGIN_TIME = 0L;
private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
private Configuration hbaseConf = null; private Configuration hbaseConf = null;
private Connection conn; private Connection conn;
private EntityTable entityTable;
private AppToFlowTable appToFlowTable;
private ApplicationTable applicationTable;
public HBaseTimelineReaderImpl() { public HBaseTimelineReaderImpl() {
super(HBaseTimelineReaderImpl.class.getName()); super(HBaseTimelineReaderImpl.class.getName());
@ -85,9 +50,6 @@ public class HBaseTimelineReaderImpl
super.serviceInit(conf); super.serviceInit(conf);
hbaseConf = HBaseConfiguration.create(conf); hbaseConf = HBaseConfiguration.create(conf);
conn = ConnectionFactory.createConnection(hbaseConf); conn = ConnectionFactory.createConnection(hbaseConf);
entityTable = new EntityTable();
appToFlowTable = new AppToFlowTable();
applicationTable = new ApplicationTable();
} }
@Override @Override
@ -104,35 +66,10 @@ public class HBaseTimelineReaderImpl
String flowId, Long flowRunId, String appId, String entityType, String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve) String entityId, EnumSet<Field> fieldsToRetrieve)
throws IOException { throws IOException {
validateParams(userId, clusterId, appId, entityType, entityId, true); TimelineEntityReader reader =
// In reality both should be null or neither should be null TimelineEntityReaderFactory.createSingleEntityReader(userId, clusterId,
if (flowId == null || flowRunId == null) { flowId, flowRunId, appId, entityType, entityId, fieldsToRetrieve);
FlowContext context = lookupFlowContext(clusterId, appId); return reader.readEntity(hbaseConf, conn);
flowId = context.flowId;
flowRunId = context.flowRunId;
}
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.noneOf(Field.class);
}
boolean isApplication = isApplicationEntity(entityType);
byte[] rowKey = isApplication ?
ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId,
appId) :
EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId,
entityType, entityId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
Result result = isApplication ?
applicationTable.getResult(hbaseConf, conn, get) :
entityTable.getResult(hbaseConf, conn, get);
return parseEntity(result, fieldsToRetrieve,
false, DEFAULT_BEGIN_TIME, DEFAULT_END_TIME, false, DEFAULT_BEGIN_TIME,
DEFAULT_END_TIME, null, null, null, null, null, null, isApplication);
}
private static boolean isApplicationEntity(String entityType) {
return TimelineEntityType.YARN_APPLICATION.toString().equals(entityType);
} }
@Override @Override
@ -144,361 +81,12 @@ public class HBaseTimelineReaderImpl
Map<String, Object> infoFilters, Map<String, String> configFilters, Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters, Set<String> metricFilters, Set<String> eventFilters,
EnumSet<Field> fieldsToRetrieve) throws IOException { EnumSet<Field> fieldsToRetrieve) throws IOException {
validateParams(userId, clusterId, appId, entityType, null, false); TimelineEntityReader reader =
// In reality both should be null or neither should be null TimelineEntityReaderFactory.createMultipleEntitiesReader(userId,
if (flowId == null || flowRunId == null) { clusterId, flowId, flowRunId, appId, entityType, limit,
FlowContext context = lookupFlowContext(clusterId, appId); createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
flowId = context.flowId; modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
flowRunId = context.flowRunId; metricFilters, eventFilters, fieldsToRetrieve);
} return reader.readEntities(hbaseConf, conn);
if (limit == null) {
limit = TimelineReader.DEFAULT_LIMIT;
}
if (createdTimeBegin == null) {
createdTimeBegin = DEFAULT_BEGIN_TIME;
}
if (createdTimeEnd == null) {
createdTimeEnd = DEFAULT_END_TIME;
}
if (modifiedTimeBegin == null) {
modifiedTimeBegin = DEFAULT_BEGIN_TIME;
}
if (modifiedTimeEnd == null) {
modifiedTimeEnd = DEFAULT_END_TIME;
}
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.noneOf(Field.class);
}
NavigableSet<TimelineEntity> entities = new TreeSet<>();
boolean isApplication = isApplicationEntity(entityType);
if (isApplication) {
// If getEntities() is called for an application, there can be at most
// one entity. If the entity passes the filter, it is returned. Otherwise,
// an empty set is returned.
byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId,
flowRunId, appId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
Result result = applicationTable.getResult(hbaseConf, conn, get);
TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
true, createdTimeBegin, createdTimeEnd, true, modifiedTimeBegin,
modifiedTimeEnd, isRelatedTo, relatesTo, infoFilters, configFilters,
eventFilters, metricFilters, isApplication);
if (entity != null) {
entities.add(entity);
}
} else {
// Scan through part of the table to find the entities belong to one app
// and one type
Scan scan = new Scan();
scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
clusterId, userId, flowId, flowRunId, appId, entityType));
scan.setMaxVersions(Integer.MAX_VALUE);
ResultScanner scanner =
entityTable.getResultScanner(hbaseConf, conn, scan);
for (Result result : scanner) {
TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
true, createdTimeBegin, createdTimeEnd,
true, modifiedTimeBegin, modifiedTimeEnd,
isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters,
metricFilters, isApplication);
if (entity == null) {
continue;
}
if (entities.size() > limit) {
entities.pollLast();
}
entities.add(entity);
}
}
return entities;
}
private FlowContext lookupFlowContext(String clusterId, String appId)
throws IOException {
byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
Get get = new Get(rowKey);
Result result = appToFlowTable.getResult(hbaseConf, conn, get);
if (result != null && !result.isEmpty()) {
return new FlowContext(
AppToFlowColumn.FLOW_ID.readResult(result).toString(),
((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
} else {
throw new IOException(
"Unable to find the context flow ID and flow run ID for clusterId=" +
clusterId + ", appId=" + appId);
}
}
private static class FlowContext {
private String flowId;
private Long flowRunId;
public FlowContext(String flowId, Long flowRunId) {
this.flowId = flowId;
this.flowRunId = flowRunId;
}
}
private static void validateParams(String userId, String clusterId,
String appId, String entityType, String entityId, boolean checkEntityId) {
Preconditions.checkNotNull(userId, "userId shouldn't be null");
Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
Preconditions.checkNotNull(appId, "appId shouldn't be null");
Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
if (checkEntityId) {
Preconditions.checkNotNull(entityId, "entityId shouldn't be null");
}
}
private static TimelineEntity parseEntity(
Result result, EnumSet<Field> fieldsToRetrieve,
boolean checkCreatedTime, long createdTimeBegin, long createdTimeEnd,
boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd,
Map<String, Set<String>> isRelatedTo, Map<String, Set<String>> relatesTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> eventFilters, Set<String> metricFilters,
boolean isApplication)
throws IOException {
if (result == null || result.isEmpty()) {
return null;
}
TimelineEntity entity = new TimelineEntity();
String entityType = isApplication ?
TimelineEntityType.YARN_APPLICATION.toString() :
EntityColumn.TYPE.readResult(result).toString();
entity.setType(entityType);
String entityId = isApplication ?
ApplicationColumn.ID.readResult(result).toString() :
EntityColumn.ID.readResult(result).toString();
entity.setId(entityId);
// fetch created time
Number createdTime = isApplication ?
(Number)ApplicationColumn.CREATED_TIME.readResult(result) :
(Number)EntityColumn.CREATED_TIME.readResult(result);
entity.setCreatedTime(createdTime.longValue());
if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin ||
entity.getCreatedTime() > createdTimeEnd)) {
return null;
}
// fetch modified time
Number modifiedTime = isApplication ?
(Number)ApplicationColumn.MODIFIED_TIME.readResult(result) :
(Number)EntityColumn.MODIFIED_TIME.readResult(result);
entity.setModifiedTime(modifiedTime.longValue());
if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin ||
entity.getModifiedTime() > modifiedTimeEnd)) {
return null;
}
// fetch is related to entities
boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
if (isApplication) {
readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
true);
} else {
readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO,
true);
}
if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
entity.getIsRelatedToEntities(), isRelatedTo)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
entity.getIsRelatedToEntities().clear();
}
}
// fetch relates to entities
boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
if (isApplication) {
readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
false);
} else {
readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
}
if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
entity.getRelatesToEntities(), relatesTo)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.RELATES_TO)) {
entity.getRelatesToEntities().clear();
}
}
// fetch info
boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
if (isApplication) {
readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
} else {
readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
}
if (checkInfo &&
!TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.INFO)) {
entity.getInfo().clear();
}
}
// fetch configs
boolean checkConfigs = configFilters != null && configFilters.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
if (isApplication) {
readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
} else {
readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
}
if (checkConfigs && !TimelineReaderUtils.matchFilters(
entity.getConfigs(), configFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.CONFIGS)) {
entity.getConfigs().clear();
}
}
// fetch events
boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
readEvents(entity, result, isApplication);
if (checkEvents && !TimelineReaderUtils.matchEventFilters(
entity.getEvents(), eventFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.EVENTS)) {
entity.getEvents().clear();
}
}
// fetch metrics
boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
readMetrics(entity, result, isApplication);
if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
entity.getMetrics(), metricFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.METRICS)) {
entity.getMetrics().clear();
}
}
return entity;
}
private static <T> void readRelationship(
TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
boolean isRelatedTo) throws IOException {
// isRelatedTo and relatesTo are of type Map<String, Set<String>>
Map<String, Object> columns = prefix.readResults(result);
for (Map.Entry<String, Object> column : columns.entrySet()) {
for (String id : Separator.VALUES.splitEncoded(
column.getValue().toString())) {
if (isRelatedTo) {
entity.addIsRelatedToEntity(column.getKey(), id);
} else {
entity.addRelatesToEntity(column.getKey(), id);
}
}
}
}
private static <T> void readKeyValuePairs(
TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
boolean isConfig) throws IOException {
// info and configuration are of type Map<String, Object or String>
Map<String, Object> columns = prefix.readResults(result);
if (isConfig) {
for (Map.Entry<String, Object> column : columns.entrySet()) {
entity.addConfig(column.getKey(), column.getValue().toString());
}
} else {
entity.addInfo(columns);
}
}
/**
* Read events from the entity table or the application table. The column name
* is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
* if there is no info associated with the event.
*
* See {@link EntityTable} and {@link ApplicationTable} for a more detailed
* schema description.
*/
private static void readEvents(TimelineEntity entity, Result result,
boolean isApplication) throws IOException {
Map<String, TimelineEvent> eventsMap = new HashMap<>();
Map<?, Object> eventsResult = isApplication ?
ApplicationColumnPrefix.EVENT.
readResultsHavingCompoundColumnQualifiers(result) :
EntityColumnPrefix.EVENT.
readResultsHavingCompoundColumnQualifiers(result);
for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
byte[][] karr = (byte[][])eventResult.getKey();
// the column name is of the form "eventId=timestamp=infoKey"
if (karr.length == 3) {
String id = Bytes.toString(karr[0]);
long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1]));
String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
TimelineEvent event = eventsMap.get(key);
if (event == null) {
event = new TimelineEvent();
event.setId(id);
event.setTimestamp(ts);
eventsMap.put(key, event);
}
// handle empty info
String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
if (infoKey != null) {
event.addInfo(infoKey, eventResult.getValue());
}
} else {
LOG.warn("incorrectly formatted column name: it will be discarded");
continue;
}
}
Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
entity.addEvents(eventsSet);
}
private static void readMetrics(TimelineEntity entity, Result result,
boolean isApplication) throws IOException {
NavigableMap<String, NavigableMap<Long, Number>> metricsResult;
if (isApplication) {
metricsResult =
ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
} else {
metricsResult =
EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
}
for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
metricsResult.entrySet()) {
TimelineMetric metric = new TimelineMetric();
metric.setId(metricResult.getKey());
// Simply assume that if the value set contains more than 1 elements, the
// metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
metric.setType(metricResult.getValue().size() > 1 ?
TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
metric.addValues(metricResult.getValue());
entity.addMetric(metric);
}
} }
} }

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
/**
* The base class for reading and deserializing timeline entities from the
* HBase storage. Different types can be defined for different types of the
* entities that are being requested.
*/
abstract class TimelineEntityReader {
protected final boolean singleEntityRead;
protected String userId;
protected String clusterId;
protected String flowId;
protected Long flowRunId;
protected String appId;
protected String entityType;
protected EnumSet<Field> fieldsToRetrieve;
// used only for a single entity read mode
protected String entityId;
// used only for multiple entity read mode
protected Long limit;
protected Long createdTimeBegin;
protected Long createdTimeEnd;
protected Long modifiedTimeBegin;
protected Long modifiedTimeEnd;
protected Map<String, Set<String>> relatesTo;
protected Map<String, Set<String>> isRelatedTo;
protected Map<String, Object> infoFilters;
protected Map<String, String> configFilters;
protected Set<String> metricFilters;
protected Set<String> eventFilters;
/**
* Main table the entity reader uses.
*/
protected BaseTable<?> table;
/**
* Instantiates a reader for multiple-entity reads.
*/
protected TimelineEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
Long limit, Long createdTimeBegin, Long createdTimeEnd,
Long modifiedTimeBegin, Long modifiedTimeEnd,
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
EnumSet<Field> fieldsToRetrieve) {
this.singleEntityRead = false;
this.userId = userId;
this.clusterId = clusterId;
this.flowId = flowId;
this.flowRunId = flowRunId;
this.appId = appId;
this.entityType = entityType;
this.fieldsToRetrieve = fieldsToRetrieve;
this.limit = limit;
this.createdTimeBegin = createdTimeBegin;
this.createdTimeEnd = createdTimeEnd;
this.modifiedTimeBegin = modifiedTimeBegin;
this.modifiedTimeEnd = modifiedTimeEnd;
this.relatesTo = relatesTo;
this.isRelatedTo = isRelatedTo;
this.infoFilters = infoFilters;
this.configFilters = configFilters;
this.metricFilters = metricFilters;
this.eventFilters = eventFilters;
this.table = getTable();
}
/**
* Instantiates a reader for single-entity reads.
*/
protected TimelineEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve) {
this.singleEntityRead = true;
this.userId = userId;
this.clusterId = clusterId;
this.flowId = flowId;
this.flowRunId = flowRunId;
this.appId = appId;
this.entityType = entityType;
this.fieldsToRetrieve = fieldsToRetrieve;
this.entityId = entityId;
this.table = getTable();
}
/**
* Reads and deserializes a single timeline entity from the HBase storage.
*/
public TimelineEntity readEntity(Configuration hbaseConf, Connection conn)
throws IOException {
validateParams();
augmentParams(hbaseConf, conn);
Result result = getResult(hbaseConf, conn);
return parseEntity(result);
}
/**
* Reads and deserializes a set of timeline entities from the HBase storage.
* It goes through all the results available, and returns the number of
* entries as specified in the limit in the entity's natural sort order.
*/
public Set<TimelineEntity> readEntities(Configuration hbaseConf,
Connection conn) throws IOException {
validateParams();
augmentParams(hbaseConf, conn);
NavigableSet<TimelineEntity> entities = new TreeSet<>();
Iterable<Result> results = getResults(hbaseConf, conn);
for (Result result : results) {
TimelineEntity entity = parseEntity(result);
if (entity == null) {
continue;
}
entities.add(entity);
if (entities.size() > limit) {
entities.pollLast();
}
}
return entities;
}
/**
* Returns the main table to be used by the entity reader.
*/
protected abstract BaseTable<?> getTable();
/**
* Validates the required parameters to read the entities.
*/
protected abstract void validateParams();
/**
* Sets certain parameters to defaults if the values are not provided.
*/
protected abstract void augmentParams(Configuration hbaseConf,
Connection conn) throws IOException;
/**
* Fetches a {@link Result} instance for a single-entity read.
*
* @return the {@link Result} instance or null if no such record is found.
*/
protected abstract Result getResult(Configuration hbaseConf, Connection conn)
throws IOException;
/**
* Fetches an iterator for {@link Result} instances for a multi-entity read.
*/
protected abstract Iterable<Result> getResults(Configuration hbaseConf,
Connection conn) throws IOException;
/**
* Given a {@link Result} instance, deserializes and creates a
* {@link TimelineEntity}.
*
* @return the {@link TimelineEntity} instance, or null if the {@link Result}
* is null or empty.
*/
protected abstract TimelineEntity parseEntity(Result result)
throws IOException;
/**
* Helper method for reading and deserializing {@link TimelineMetric} objects
* using the specified column prefix. The timeline metrics then are added to
* the given timeline entity.
*/
protected void readMetrics(TimelineEntity entity, Result result,
ColumnPrefix<?> columnPrefix) throws IOException {
NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
columnPrefix.readResultsWithTimestamps(result);
for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
metricsResult.entrySet()) {
TimelineMetric metric = new TimelineMetric();
metric.setId(metricResult.getKey());
// Simply assume that if the value set contains more than 1 elements, the
// metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
metric.setType(metricResult.getValue().size() > 1 ?
TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
metric.addValues(metricResult.getValue());
entity.addMetric(metric);
}
}
}

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
/**
* Factory methods for instantiating a timeline entity reader.
*/
class TimelineEntityReaderFactory {
/**
* Creates a timeline entity reader instance for reading a single entity with
* the specified input.
*/
public static TimelineEntityReader createSingleEntityReader(String userId,
String clusterId, String flowId, Long flowRunId, String appId,
String entityType, String entityId, EnumSet<Field> fieldsToRetrieve) {
// currently the types that are handled separate from the generic entity
// table are application, flow run, and flow activity entities
if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, entityId, fieldsToRetrieve);
} else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, entityId, fieldsToRetrieve);
} else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, entityId, fieldsToRetrieve);
} else {
// assume we're dealing with a generic entity read
return new GenericEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, entityId, fieldsToRetrieve);
}
}
/**
* Creates a timeline entity reader instance for reading set of entities with
* the specified input and predicates.
*/
public static TimelineEntityReader createMultipleEntitiesReader(String userId,
String clusterId, String flowId, Long flowRunId, String appId,
String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd,
Long modifiedTimeBegin, Long modifiedTimeEnd,
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
EnumSet<Field> fieldsToRetrieve) {
// currently the types that are handled separate from the generic entity
// table are application, flow run, and flow activity entities
if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
infoFilters, configFilters, metricFilters, eventFilters,
fieldsToRetrieve);
} else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
infoFilters, configFilters, metricFilters, eventFilters,
fieldsToRetrieve);
} else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
infoFilters, configFilters, metricFilters, eventFilters,
fieldsToRetrieve);
} else {
// assume we're dealing with a generic entity read
return new GenericEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
infoFilters, configFilters, metricFilters, eventFilters,
fieldsToRetrieve);
}
}
}

View File

@ -19,14 +19,46 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.application;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
/** /**
* Represents a rowkey for the application table. * Represents a rowkey for the application table.
*/ */
public class ApplicationRowKey { public class ApplicationRowKey {
// TODO: more methods are needed for this class. private final String clusterId;
private final String userId;
private final String flowId;
private final long flowRunId;
private final String appId;
// TODO: API needs to be cleaned up. public ApplicationRowKey(String clusterId, String userId, String flowId,
long flowRunId, String appId) {
this.clusterId = clusterId;
this.userId = userId;
this.flowId = flowId;
this.flowRunId = flowRunId;
this.appId = appId;
}
public String getClusterId() {
return clusterId;
}
public String getUserId() {
return userId;
}
public String getFlowId() {
return flowId;
}
public long getFlowRunId() {
return flowRunId;
}
public String getAppId() {
return appId;
}
/** /**
* Constructs a row key for the application table as follows: * Constructs a row key for the application table as follows:
@ -46,22 +78,32 @@ public class ApplicationRowKey {
flowId)); flowId));
// Note that flowRunId is a long, so we can't encode them all at the same // Note that flowRunId is a long, so we can't encode them all at the same
// time. // time.
byte[] second = Bytes.toBytes(ApplicationRowKey.invert(flowRunId)); byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
byte[] third = Bytes.toBytes(appId); byte[] third = Bytes.toBytes(appId);
return Separator.QUALIFIERS.join(first, second, third); return Separator.QUALIFIERS.join(first, second, third);
} }
/** /**
* Converts a timestamp into its inverse timestamp to be used in (row) keys * Given the raw row key as bytes, returns the row key as an object.
* where we want to have the most recent timestamp in the top of the table
* (scans start at the most recent timestamp first).
*
* @param key value to be inverted so that the latest version will be first in
* a scan.
* @return inverted long
*/ */
public static long invert(Long key) { public static ApplicationRowKey parseRowKey(byte[] rowKey) {
return Long.MAX_VALUE - key; byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
}
if (rowKeyComponents.length < 5) {
throw new IllegalArgumentException("the row key is not valid for " +
"an application");
}
String clusterId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
String userId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
String flowId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
long flowRunId =
TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
String appId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4]));
return new ApplicationRowKey(clusterId, userId, flowId, flowRunId, appId);
}
} }

View File

@ -24,6 +24,22 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
* Represents a rowkey for the app_flow table. * Represents a rowkey for the app_flow table.
*/ */
public class AppToFlowRowKey { public class AppToFlowRowKey {
private final String clusterId;
private final String appId;
public AppToFlowRowKey(String clusterId, String appId) {
this.clusterId = clusterId;
this.appId = appId;
}
public String getClusterId() {
return clusterId;
}
public String getAppId() {
return appId;
}
/** /**
* Constructs a row key prefix for the app_flow table as follows: * Constructs a row key prefix for the app_flow table as follows:
* {@code clusterId!AppId} * {@code clusterId!AppId}
@ -36,4 +52,19 @@ public class AppToFlowRowKey {
return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId)); return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId));
} }
/**
* Given the raw row key as bytes, returns the row key as an object.
*/
public static AppToFlowRowKey parseRowKey(byte[] rowKey) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
if (rowKeyComponents.length < 2) {
throw new IllegalArgumentException("the row key is not valid for " +
"the app-to-flow table");
}
String clusterId = Bytes.toString(rowKeyComponents[0]);
String appId = Bytes.toString(rowKeyComponents[1]);
return new AppToFlowRowKey(clusterId, appId);
}
} }

View File

@ -31,7 +31,8 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
/** /**
* Implements behavior common to tables used in the timeline service storage. * Implements behavior common to tables used in the timeline service storage. It
* is thread-safe, and can be used by multiple threads concurrently.
* *
* @param <T> reference to the table instance class itself for type safety. * @param <T> reference to the table instance class itself for type safety.
*/ */

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.entity; package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
@ -26,9 +25,52 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWrit
* Represents a rowkey for the entity table. * Represents a rowkey for the entity table.
*/ */
public class EntityRowKey { public class EntityRowKey {
// TODO: more methods are needed for this class. private final String clusterId;
private final String userId;
private final String flowId;
private final long flowRunId;
private final String appId;
private final String entityType;
private final String entityId;
// TODO: API needs to be cleaned up. public EntityRowKey(String clusterId, String userId, String flowId,
long flowRunId, String appId, String entityType, String entityId) {
this.clusterId = clusterId;
this.userId = userId;
this.flowId = flowId;
this.flowRunId = flowRunId;
this.appId = appId;
this.entityType = entityType;
this.entityId = entityId;
}
public String getClusterId() {
return clusterId;
}
public String getUserId() {
return userId;
}
public String getFlowId() {
return flowId;
}
public long getFlowRunId() {
return flowRunId;
}
public String getAppId() {
return appId;
}
public String getEntityType() {
return entityType;
}
public String getEntityId() {
return entityId;
}
/** /**
* Constructs a row key prefix for the entity table as follows: * Constructs a row key prefix for the entity table as follows:
@ -106,4 +148,32 @@ public class EntityRowKey {
return Separator.QUALIFIERS.join(first, second, third); return Separator.QUALIFIERS.join(first, second, third);
} }
/**
* Given the raw row key as bytes, returns the row key as an object.
*/
public static EntityRowKey parseRowKey(byte[] rowKey) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
if (rowKeyComponents.length < 7) {
throw new IllegalArgumentException("the row key is not valid for " +
"an entity");
}
String userId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
String clusterId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
String flowId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
long flowRunId =
TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
String appId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4]));
String entityType =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5]));
String entityId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[6]));
return new EntityRowKey(clusterId, userId, flowId, flowRunId, appId,
entityType, entityId);
}
} }

View File

@ -55,6 +55,10 @@ public class FlowActivityRowKey {
return flowId; return flowId;
} }
public static byte[] getRowKeyPrefix(String clusterId) {
return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, ""));
}
/** /**
* Constructs a row key for the flow activity table as follows: * Constructs a row key for the flow activity table as follows:
* {@code clusterId!dayTimestamp!user!flowId} * {@code clusterId!dayTimestamp!user!flowId}
@ -65,7 +69,8 @@ public class FlowActivityRowKey {
* @param flowId * @param flowId
* @return byte array with the row key prefix * @return byte array with the row key prefix
*/ */
public static byte[] getRowKey(String clusterId, String userId, String flowId) { public static byte[] getRowKey(String clusterId, String userId,
String flowId) {
long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
.currentTimeMillis()); .currentTimeMillis());
return getRowKey(clusterId, dayTs, userId, flowId); return getRowKey(clusterId, dayTs, userId, flowId);

View File

@ -25,7 +25,34 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWrit
* Represents a rowkey for the flow run table. * Represents a rowkey for the flow run table.
*/ */
public class FlowRunRowKey { public class FlowRunRowKey {
// TODO: more methods are needed for this class like parse row key private final String clusterId;
private final String userId;
private final String flowId;
private final long flowRunId;
public FlowRunRowKey(String clusterId, String userId, String flowId,
long flowRunId) {
this.clusterId = clusterId;
this.userId = userId;
this.flowId = flowId;
this.flowRunId = flowRunId;
}
public String getClusterId() {
return clusterId;
}
public String getUserId() {
return userId;
}
public String getFlowId() {
return flowId;
}
public long getFlowRunId() {
return flowRunId;
}
/** /**
* Constructs a row key for the entity table as follows: { * Constructs a row key for the entity table as follows: {
@ -47,4 +74,25 @@ public class FlowRunRowKey {
return Separator.QUALIFIERS.join(first, second); return Separator.QUALIFIERS.join(first, second);
} }
/**
* Given the raw row key as bytes, returns the row key as an object.
*/
public static FlowRunRowKey parseRowKey(byte[] rowKey) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
if (rowKeyComponents.length < 4) {
throw new IllegalArgumentException("the row key is not valid for " +
"a flow run");
}
String clusterId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
String userId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
String flowId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
long flowRunId =
TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
return new FlowRunRowKey(clusterId, userId, flowId, flowRunId);
}
} }

View File

@ -18,6 +18,15 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.flow; package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
@ -33,15 +42,6 @@ import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
/** /**
* Invoked via the coprocessor when a Get or a Scan is issued for flow run * Invoked via the coprocessor when a Get or a Scan is issued for flow run
* table. Looks through the list of cells per row, checks their tags and does * table. Looks through the list of cells per row, checks their tags and does

View File

@ -508,32 +508,28 @@ public class TestHBaseTimelineStorage {
private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
String flow, long runid, String appName, TimelineEntity te) { String flow, long runid, String appName, TimelineEntity te) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); EntityRowKey key = EntityRowKey.parseRowKey(rowKey);
assertTrue(rowKeyComponents.length == 7); assertEquals(user, key.getUserId());
assertEquals(user, Bytes.toString(rowKeyComponents[0])); assertEquals(cluster, key.getClusterId());
assertEquals(cluster, Bytes.toString(rowKeyComponents[1])); assertEquals(flow, key.getFlowId());
assertEquals(flow, Bytes.toString(rowKeyComponents[2])); assertEquals(runid, key.getFlowRunId());
assertEquals(TimelineWriterUtils.invert(runid), assertEquals(appName, key.getAppId());
Bytes.toLong(rowKeyComponents[3])); assertEquals(te.getType(), key.getEntityType());
assertEquals(appName, Bytes.toString(rowKeyComponents[4])); assertEquals(te.getId(), key.getEntityId());
assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
return true; return true;
} }
private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster, private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster,
String user, String flow, long runid, String appName) { String user, String flow, long runid, String appName) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); ApplicationRowKey key = ApplicationRowKey.parseRowKey(rowKey);
assertTrue(rowKeyComponents.length == 5); assertEquals(cluster, key.getClusterId());
assertEquals(cluster, Bytes.toString(rowKeyComponents[0])); assertEquals(user, key.getUserId());
assertEquals(user, Bytes.toString(rowKeyComponents[1])); assertEquals(flow, key.getFlowId());
assertEquals(flow, Bytes.toString(rowKeyComponents[2])); assertEquals(runid, key.getFlowRunId());
assertEquals(TimelineWriterUtils.invert(runid), assertEquals(appName, key.getAppId());
Bytes.toLong(rowKeyComponents[3]));
assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
return true; return true;
} }

View File

@ -45,7 +45,7 @@ class TestFlowDataGenerator {
String type = TimelineEntityType.YARN_APPLICATION.toString(); String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id); entity.setId(id);
entity.setType(type); entity.setType(type);
Long cTime = 1425016501000L; long cTime = 1425016501000L;
entity.setCreatedTime(cTime); entity.setCreatedTime(cTime);
// add metrics // add metrics
@ -54,8 +54,8 @@ class TestFlowDataGenerator {
m1.setId(metric1); m1.setId(metric1);
Map<Long, Number> metricValues = new HashMap<Long, Number>(); Map<Long, Number> metricValues = new HashMap<Long, Number>();
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
metricValues.put(ts - 100000, 2); metricValues.put(ts - 100000, 2L);
metricValues.put(ts - 80000, 40); metricValues.put(ts - 80000, 40L);
m1.setType(Type.TIME_SERIES); m1.setType(Type.TIME_SERIES);
m1.setValues(metricValues); m1.setValues(metricValues);
metrics.add(m1); metrics.add(m1);
@ -64,8 +64,8 @@ class TestFlowDataGenerator {
m2.setId(metric2); m2.setId(metric2);
metricValues = new HashMap<Long, Number>(); metricValues = new HashMap<Long, Number>();
ts = System.currentTimeMillis(); ts = System.currentTimeMillis();
metricValues.put(ts - 100000, 31); metricValues.put(ts - 100000, 31L);
metricValues.put(ts - 80000, 57); metricValues.put(ts - 80000, 57L);
m2.setType(Type.TIME_SERIES); m2.setType(Type.TIME_SERIES);
m2.setValues(metricValues); m2.setValues(metricValues);
metrics.add(m2); metrics.add(m2);
@ -80,7 +80,7 @@ class TestFlowDataGenerator {
String type = TimelineEntityType.YARN_APPLICATION.toString(); String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id); entity.setId(id);
entity.setType(type); entity.setType(type);
Long cTime = 1425016501000L; long cTime = 1425016501000L;
entity.setCreatedTime(cTime); entity.setCreatedTime(cTime);
// add metrics // add metrics
Set<TimelineMetric> metrics = new HashSet<>(); Set<TimelineMetric> metrics = new HashSet<>();
@ -103,8 +103,8 @@ class TestFlowDataGenerator {
String type = TimelineEntityType.YARN_APPLICATION.toString(); String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id); entity.setId(id);
entity.setType(type); entity.setType(type);
Long cTime = 20000000000000L; long cTime = 20000000000000L;
Long mTime = 1425026901000L; long mTime = 1425026901000L;
entity.setCreatedTime(cTime); entity.setCreatedTime(cTime);
entity.setModifiedTime(mTime); entity.setModifiedTime(mTime);
// add metrics // add metrics
@ -113,10 +113,10 @@ class TestFlowDataGenerator {
m1.setId(metric1); m1.setId(metric1);
Map<Long, Number> metricValues = new HashMap<Long, Number>(); Map<Long, Number> metricValues = new HashMap<Long, Number>();
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
metricValues.put(ts - 120000, 100000000); metricValues.put(ts - 120000, 100000000L);
metricValues.put(ts - 100000, 200000000); metricValues.put(ts - 100000, 200000000L);
metricValues.put(ts - 80000, 300000000); metricValues.put(ts - 80000, 300000000L);
metricValues.put(ts - 60000, 400000000); metricValues.put(ts - 60000, 400000000L);
metricValues.put(ts - 40000, 50000000000L); metricValues.put(ts - 40000, 50000000000L);
metricValues.put(ts - 20000, 60000000000L); metricValues.put(ts - 20000, 60000000000L);
m1.setType(Type.TIME_SERIES); m1.setType(Type.TIME_SERIES);
@ -126,7 +126,7 @@ class TestFlowDataGenerator {
TimelineEvent event = new TimelineEvent(); TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
Long expTs = 1436512802000L; long expTs = 1436512802000L;
event.setTimestamp(expTs); event.setTimestamp(expTs);
String expKey = "foo_event"; String expKey = "foo_event";
Object expVal = "test"; Object expVal = "test";
@ -142,9 +142,9 @@ class TestFlowDataGenerator {
return entity; return entity;
} }
static TimelineEntity getEntityGreaterStartTime() { static TimelineEntity getEntityGreaterStartTime(long startTs) {
TimelineEntity entity = new TimelineEntity(); TimelineEntity entity = new TimelineEntity();
entity.setCreatedTime(30000000000000L); entity.setCreatedTime(startTs);
entity.setId("flowRunHello with greater start time"); entity.setId("flowRunHello with greater start time");
String type = TimelineEntityType.YARN_APPLICATION.toString(); String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setType(type); entity.setType(type);
@ -173,14 +173,13 @@ class TestFlowDataGenerator {
return entity; return entity;
} }
static TimelineEntity getEntityMinStartTime() { static TimelineEntity getEntityMinStartTime(long startTs) {
TimelineEntity entity = new TimelineEntity(); TimelineEntity entity = new TimelineEntity();
String id = "flowRunHelloMInStartTime"; String id = "flowRunHelloMInStartTime";
String type = TimelineEntityType.YARN_APPLICATION.toString(); String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id); entity.setId(id);
entity.setType(type); entity.setType(type);
Long cTime = 10000000000000L; entity.setCreatedTime(startTs);
entity.setCreatedTime(cTime);
TimelineEvent event = new TimelineEvent(); TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
event.setTimestamp(System.currentTimeMillis()); event.setTimestamp(System.currentTimeMillis());
@ -195,12 +194,12 @@ class TestFlowDataGenerator {
String type = TimelineEntityType.YARN_APPLICATION.toString(); String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id); entity.setId(id);
entity.setType(type); entity.setType(type);
Long cTime = 1425016501000L; long cTime = 1425016501000L;
entity.setCreatedTime(cTime); entity.setCreatedTime(cTime);
TimelineEvent event = new TimelineEvent(); TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
Long expTs = 1436512802000L; long expTs = 1436512802000L;
event.setTimestamp(expTs); event.setTimestamp(expTs);
String expKey = "foo_event"; String expKey = "foo_event";
Object expVal = "test"; Object expVal = "test";

View File

@ -21,19 +21,16 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.NavigableSet;
import java.util.Set; import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
@ -42,26 +39,17 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -119,11 +107,13 @@ public class TestHBaseStorageFlowActivity {
String user = "testWriteFlowRunMinMaxToHBase_user1"; String user = "testWriteFlowRunMinMaxToHBase_user1";
String flow = "testing_flowRun_flow_name"; String flow = "testing_flowRun_flow_name";
String flowVersion = "CF7022C10F1354"; String flowVersion = "CF7022C10F1354";
Long runid = 1002345678919L; long runid = 1002345678919L;
String appName = "application_100000000000_1111"; String appName = "application_100000000000_1111";
long minStartTs = 10000000000000L;
long greaterStartTs = 30000000000000L;
long endTs = 1439750690000L; long endTs = 1439750690000L;
TimelineEntity entityMinStartTime = TestFlowDataGenerator TimelineEntity entityMinStartTime = TestFlowDataGenerator
.getEntityMinStartTime(); .getEntityMinStartTime(minStartTs);
try { try {
hbi = new HBaseTimelineWriterImpl(c1); hbi = new HBaseTimelineWriterImpl(c1);
@ -146,7 +136,7 @@ public class TestHBaseStorageFlowActivity {
// writer another entity with greater start time // writer another entity with greater start time
TimelineEntity entityGreaterStartTime = TestFlowDataGenerator TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
.getEntityGreaterStartTime(); .getEntityGreaterStartTime(greaterStartTs);
te = new TimelineEntities(); te = new TimelineEntities();
te.addEntity(entityGreaterStartTime); te.addEntity(entityGreaterStartTime);
appName = "application_1000000000000000_2222"; appName = "application_1000000000000000_2222";
@ -181,6 +171,31 @@ public class TestHBaseStorageFlowActivity {
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
assertEquals(1, values.size()); assertEquals(1, values.size());
checkFlowActivityRunId(runid, flowVersion, values); checkFlowActivityRunId(runid, flowVersion, values);
// use the timeline reader to verify data
HBaseTimelineReaderImpl hbr = null;
try {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
// get the flow activity entity
Set<TimelineEntity> entities =
hbr.getEntities(null, cluster, null, null, null,
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
null, null, null, null, null, null, null, null, null);
assertEquals(1, entities.size());
for (TimelineEntity e : entities) {
FlowActivityEntity flowActivity = (FlowActivityEntity)e;
assertEquals(cluster, flowActivity.getCluster());
assertEquals(user, flowActivity.getUser());
assertEquals(flow, flowActivity.getFlowName());
assertEquals(dayTs, flowActivity.getDate().getTime());
Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
assertEquals(1, flowRuns.size());
}
} finally {
hbr.close();
}
} }
/** /**
@ -193,7 +208,7 @@ public class TestHBaseStorageFlowActivity {
String user = "testWriteFlowActivityOneFlow_user1"; String user = "testWriteFlowActivityOneFlow_user1";
String flow = "flow_activity_test_flow_name"; String flow = "flow_activity_test_flow_name";
String flowVersion = "A122110F135BC4"; String flowVersion = "A122110F135BC4";
Long runid = 1001111178919L; long runid = 1001111178919L;
TimelineEntities te = new TimelineEntities(); TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1(); TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
@ -212,10 +227,35 @@ public class TestHBaseStorageFlowActivity {
} }
// check flow activity // check flow activity
checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1); checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1);
// use the reader to verify the data
HBaseTimelineReaderImpl hbr = null;
try {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
Set<TimelineEntity> entities =
hbr.getEntities(user, cluster, flow, null, null,
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
null, null, null, null, null, null, null, null, null);
assertEquals(1, entities.size());
for (TimelineEntity e : entities) {
FlowActivityEntity entity = (FlowActivityEntity)e;
NavigableSet<FlowRunEntity> flowRuns = entity.getFlowRuns();
assertEquals(1, flowRuns.size());
for (FlowRunEntity flowRun : flowRuns) {
assertEquals(runid, flowRun.getRunId());
assertEquals(flowVersion, flowRun.getVersion());
}
}
} finally {
hbr.close();
}
} }
private void checkFlowActivityTable(String cluster, String user, String flow, private void checkFlowActivityTable(String cluster, String user, String flow,
String flowVersion, Long runid, Configuration c1) throws IOException { String flowVersion, long runid, Configuration c1) throws IOException {
Scan s = new Scan(); Scan s = new Scan();
s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
@ -263,7 +303,7 @@ public class TestHBaseStorageFlowActivity {
String user = "testManyRunsFlowActivity_c_user1"; String user = "testManyRunsFlowActivity_c_user1";
String flow = "flow_activity_test_flow_name"; String flow = "flow_activity_test_flow_name";
String flowVersion1 = "A122110F135BC4"; String flowVersion1 = "A122110F135BC4";
Long runid1 = 11111111111L; long runid1 = 11111111111L;
String flowVersion2 = "A12222222222C4"; String flowVersion2 = "A12222222222C4";
long runid2 = 2222222222222L; long runid2 = 2222222222222L;
@ -303,11 +343,50 @@ public class TestHBaseStorageFlowActivity {
checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1, checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
runid1, flowVersion2, runid2, flowVersion3, runid3); runid1, flowVersion2, runid2, flowVersion3, runid3);
// use the timeline reader to verify data
HBaseTimelineReaderImpl hbr = null;
try {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
Set<TimelineEntity> entities =
hbr.getEntities(null, cluster, null, null, null,
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
null, null, null, null, null, null, null, null, null);
assertEquals(1, entities.size());
for (TimelineEntity e : entities) {
FlowActivityEntity flowActivity = (FlowActivityEntity)e;
assertEquals(cluster, flowActivity.getCluster());
assertEquals(user, flowActivity.getUser());
assertEquals(flow, flowActivity.getFlowName());
long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
.currentTimeMillis());
assertEquals(dayTs, flowActivity.getDate().getTime());
Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
assertEquals(3, flowRuns.size());
for (FlowRunEntity flowRun : flowRuns) {
long runId = flowRun.getRunId();
String version = flowRun.getVersion();
if (runId == runid1) {
assertEquals(flowVersion1, version);
} else if (runId == runid2) {
assertEquals(flowVersion2, version);
} else if (runId == runid3) {
assertEquals(flowVersion3, version);
} else {
fail("unknown run id: " + runId);
}
}
}
} finally {
hbr.close();
}
} }
private void checkFlowActivityTableSeveralRuns(String cluster, String user, private void checkFlowActivityTableSeveralRuns(String cluster, String user,
String flow, Configuration c1, String flowVersion1, Long runid1, String flow, Configuration c1, String flowVersion1, long runid1,
String flowVersion2, Long runid2, String flowVersion3, Long runid3) String flowVersion2, long runid2, String flowVersion3, long runid3)
throws IOException { throws IOException {
Scan s = new Scan(); Scan s = new Scan();
s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
@ -351,7 +430,7 @@ public class TestHBaseStorageFlowActivity {
assertEquals(1, rowCount); assertEquals(1, rowCount);
} }
private void checkFlowActivityRunId(Long runid, String flowVersion, private void checkFlowActivityRunId(long runid, String flowVersion,
Map<byte[], byte[]> values) throws IOException { Map<byte[], byte[]> values) throws IOException {
byte[] rq = ColumnHelper.getColumnQualifier( byte[] rq = ColumnHelper.getColumnQualifier(
FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(), FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(),

View File

@ -21,20 +21,15 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
@ -42,32 +37,16 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -125,11 +104,13 @@ public class TestHBaseStorageFlowRun {
String user = "testWriteFlowRunMinMaxToHBase_user1"; String user = "testWriteFlowRunMinMaxToHBase_user1";
String flow = "testing_flowRun_flow_name"; String flow = "testing_flowRun_flow_name";
String flowVersion = "CF7022C10F1354"; String flowVersion = "CF7022C10F1354";
Long runid = 1002345678919L; long runid = 1002345678919L;
String appName = "application_100000000000_1111"; String appName = "application_100000000000_1111";
long minStartTs = 10000000000000L;
long greaterStartTs = 30000000000000L;
long endTs = 1439750690000L; long endTs = 1439750690000L;
TimelineEntity entityMinStartTime = TestFlowDataGenerator TimelineEntity entityMinStartTime = TestFlowDataGenerator
.getEntityMinStartTime(); .getEntityMinStartTime(minStartTs);
try { try {
hbi = new HBaseTimelineWriterImpl(c1); hbi = new HBaseTimelineWriterImpl(c1);
@ -152,7 +133,7 @@ public class TestHBaseStorageFlowRun {
// writer another entity with greater start time // writer another entity with greater start time
TimelineEntity entityGreaterStartTime = TestFlowDataGenerator TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
.getEntityGreaterStartTime(); .getEntityGreaterStartTime(greaterStartTs);
te = new TimelineEntities(); te = new TimelineEntities();
te.addEntity(entityGreaterStartTime); te.addEntity(entityGreaterStartTime);
appName = "application_1000000000000000_2222"; appName = "application_1000000000000000_2222";
@ -183,24 +164,29 @@ public class TestHBaseStorageFlowRun {
.getBytes()); .getBytes());
assertEquals(2, r1.size()); assertEquals(2, r1.size());
Long starttime = (Long) GenericObjectMapper.read(values long starttime = (Long) GenericObjectMapper.read(values
.get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
Long expmin = entityMinStartTime.getCreatedTime(); assertEquals(minStartTs, starttime);
assertEquals(expmin, starttime);
assertEquals(endTs, GenericObjectMapper.read(values assertEquals(endTs, GenericObjectMapper.read(values
.get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
}
boolean isFlowRunRowKeyCorrect(byte[] rowKey, String cluster, String user, // use the timeline reader to verify data
String flow, Long runid) { HBaseTimelineReaderImpl hbr = null;
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1); try {
assertTrue(rowKeyComponents.length == 4); hbr = new HBaseTimelineReaderImpl();
assertEquals(cluster, Bytes.toString(rowKeyComponents[0])); hbr.init(c1);
assertEquals(user, Bytes.toString(rowKeyComponents[1])); hbr.start();
assertEquals(flow, Bytes.toString(rowKeyComponents[2])); // get the flow run entity
assertEquals(TimelineWriterUtils.invert(runid), TimelineEntity entity =
Bytes.toLong(rowKeyComponents[3])); hbr.getEntity(user, cluster, flow, runid, null,
return true; TimelineEntityType.YARN_FLOW_RUN.toString(), null, null);
assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
FlowRunEntity flowRun = (FlowRunEntity)entity;
assertEquals(minStartTs, flowRun.getStartTime());
assertEquals(endTs, flowRun.getMaxEndTime());
} finally {
hbr.close();
}
} }
/** /**
@ -218,7 +204,7 @@ public class TestHBaseStorageFlowRun {
String user = "testWriteFlowRunMetricsOneFlow_user1"; String user = "testWriteFlowRunMetricsOneFlow_user1";
String flow = "testing_flowRun_metrics_flow_name"; String flow = "testing_flowRun_metrics_flow_name";
String flowVersion = "CF7022C10F1354"; String flowVersion = "CF7022C10F1354";
Long runid = 1002345678919L; long runid = 1002345678919L;
TimelineEntities te = new TimelineEntities(); TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
@ -244,6 +230,41 @@ public class TestHBaseStorageFlowRun {
// check flow run // check flow run
checkFlowRunTable(cluster, user, flow, runid, c1); checkFlowRunTable(cluster, user, flow, runid, c1);
// use the timeline reader to verify data
HBaseTimelineReaderImpl hbr = null;
try {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
TimelineEntity entity =
hbr.getEntity(user, cluster, flow, runid, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null, null);
assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
Set<TimelineMetric> metrics = entity.getMetrics();
assertEquals(2, metrics.size());
for (TimelineMetric metric : metrics) {
String id = metric.getId();
Map<Long, Number> values = metric.getValues();
assertEquals(1, values.size());
Number value = null;
for (Number n : values.values()) {
value = n;
}
switch (id) {
case metric1:
assertEquals(141, value);
break;
case metric2:
assertEquals(57, value);
break;
default:
fail("unrecognized metric: " + id);
}
}
} finally {
hbr.close();
}
} }
private void checkFlowRunTable(String cluster, String user, String flow, private void checkFlowRunTable(String cluster, String user, String flow,