From 10fa6da7d8a6013698767c6136ae20f0e04415e9 Mon Sep 17 00:00:00 2001 From: Vrushali Date: Tue, 22 Sep 2015 13:42:30 -0700 Subject: [PATCH] YARN-4074. [timeline reader] implement support for querying for flows and flow runs (sjlee via vrushali) --- .../timelineservice/FlowActivityEntity.java | 183 ++++++++ .../{FlowEntity.java => FlowRunEntity.java} | 50 +- .../timelineservice/TimelineEntityType.java | 31 +- .../TestTimelineServiceRecords.java | 14 +- .../TestTimelineServiceClientIntegration.java | 2 +- .../TimelineCollectorWebService.java | 6 +- .../storage/ApplicationEntityReader.java | 229 +++++++++ .../storage/FlowActivityEntityReader.java | 168 +++++++ .../storage/FlowRunEntityReader.java | 136 ++++++ .../storage/GenericEntityReader.java | 389 ++++++++++++++++ .../storage/HBaseTimelineReaderImpl.java | 434 +----------------- .../storage/TimelineEntityReader.java | 223 +++++++++ .../storage/TimelineEntityReaderFactory.java | 97 ++++ .../application/ApplicationRowKey.java | 68 ++- .../storage/apptoflow/AppToFlowRowKey.java | 31 ++ .../storage/common/BaseTable.java | 3 +- .../storage/entity/EntityRowKey.java | 76 ++- .../storage/flow/FlowActivityRowKey.java | 7 +- .../storage/flow/FlowRunRowKey.java | 50 +- .../storage/flow/FlowScanner.java | 18 +- .../storage/TestHBaseTimelineStorage.java | 32 +- .../storage/flow/TestFlowDataGenerator.java | 39 +- .../flow/TestHBaseStorageFlowActivity.java | 131 ++++-- .../storage/flow/TestHBaseStorageFlowRun.java | 105 +++-- 24 files changed, 1930 insertions(+), 592 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/{FlowEntity.java => FlowRunEntity.java} (71%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java new file mode 100644 index 00000000000..163bd5c6967 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import java.util.Collection; +import java.util.Date; +import java.util.NavigableSet; +import java.util.TreeSet; + +import javax.xml.bind.annotation.XmlElement; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Entity that represents a record for flow activity. It's essentially a + * container entity for flow runs with limited information. + */ +@Public +@Unstable +public class FlowActivityEntity extends TimelineEntity { + public static final String CLUSTER_INFO_KEY = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "CLUSTER"; + public static final String DATE_INFO_KEY = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "DATE"; + public static final String USER_INFO_KEY = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER"; + public static final String FLOW_NAME_INFO_KEY = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_NAME"; + + private final NavigableSet flowRuns = new TreeSet<>(); + + public FlowActivityEntity() { + super(TimelineEntityType.YARN_FLOW_ACTIVITY.toString()); + // set config to null + setConfigs(null); + } + + public FlowActivityEntity(String cluster, long time, String user, + String flowName) { + this(); + setCluster(cluster); + setDate(time); + setUser(user); + setFlowName(flowName); + } + + public FlowActivityEntity(TimelineEntity entity) { + super(entity); + if (!TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entity.getType())) { + throw new IllegalArgumentException("Incompatible entity type: " + + getId()); + } + // set config to null + setConfigs(null); + } + + @XmlElement(name = "id") + @Override + public String getId() { + // flow activity: cluster/day/user@flow_name + String id = super.getId(); + if (id == null) { + StringBuilder sb = new StringBuilder(); + sb.append(getCluster()); + sb.append('/'); + sb.append(getDate().getTime()); + sb.append('/'); + sb.append(getUser()); + sb.append('@'); + sb.append(getFlowName()); + id = sb.toString(); + setId(id); + } + return id; + } + + @Override + public int compareTo(TimelineEntity entity) { + int comparison = getType().compareTo(entity.getType()); + if (comparison == 0) { + // order by cluster, date (descending), user, and flow name + FlowActivityEntity other = (FlowActivityEntity)entity; + int clusterComparison = getCluster().compareTo(other.getCluster()); + if (clusterComparison != 0) { + return clusterComparison; + } + int dateComparisonDescending = + (int)(other.getDate().getTime() - getDate().getTime()); // descending + if (dateComparisonDescending != 0) { + return dateComparisonDescending; // descending + } + int userComparison = getUser().compareTo(other.getUser()); + if (userComparison != 0) { + return userComparison; + } + return getFlowName().compareTo(other.getFlowName()); + } else { + return comparison; + } + } + + /** + * Reuse the base class equals method. + */ + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + + /** + * Reuse the base class hashCode method. + */ + @Override + public int hashCode() { + return super.hashCode(); + } + + public String getCluster() { + return (String)getInfo().get(CLUSTER_INFO_KEY); + } + + public void setCluster(String cluster) { + addInfo(CLUSTER_INFO_KEY, cluster); + } + + public Date getDate() { + return (Date)getInfo().get(DATE_INFO_KEY); + } + + public void setDate(long time) { + Date date = new Date(time); + addInfo(DATE_INFO_KEY, date); + } + + public String getUser() { + return (String)getInfo().get(USER_INFO_KEY); + } + + public void setUser(String user) { + addInfo(USER_INFO_KEY, user); + } + + public String getFlowName() { + return (String)getInfo().get(FLOW_NAME_INFO_KEY); + } + + public void setFlowName(String flowName) { + addInfo(FLOW_NAME_INFO_KEY, flowName); + } + + public void addFlowRun(FlowRunEntity run) { + flowRuns.add(run); + } + + public void addFlowRuns(Collection runs) { + flowRuns.addAll(runs); + } + + @XmlElement(name = "flowruns") + public NavigableSet getFlowRuns() { + return flowRuns; + } + + public int getNumberOfRuns() { + return flowRuns.size(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java similarity index 71% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java index 4554778d50f..3c3ffb42a19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java @@ -17,14 +17,14 @@ */ package org.apache.hadoop.yarn.api.records.timelineservice; +import javax.xml.bind.annotation.XmlElement; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import javax.xml.bind.annotation.XmlElement; - @InterfaceAudience.Public @InterfaceStability.Unstable -public class FlowEntity extends HierarchicalTimelineEntity { +public class FlowRunEntity extends HierarchicalTimelineEntity { public static final String USER_INFO_KEY = TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER"; public static final String FLOW_NAME_INFO_KEY = @@ -33,22 +33,28 @@ public class FlowEntity extends HierarchicalTimelineEntity { TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_VERSION"; public static final String FLOW_RUN_ID_INFO_KEY = TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_ID"; + public static final String FLOW_RUN_END_TIME = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_END_TIME"; - public FlowEntity() { - super(TimelineEntityType.YARN_FLOW.toString()); + public FlowRunEntity() { + super(TimelineEntityType.YARN_FLOW_RUN.toString()); + // set config to null + setConfigs(null); } - public FlowEntity(TimelineEntity entity) { + public FlowRunEntity(TimelineEntity entity) { super(entity); - if (!entity.getType().equals(TimelineEntityType.YARN_FLOW.toString())) { + if (!entity.getType().equals(TimelineEntityType.YARN_FLOW_RUN.toString())) { throw new IllegalArgumentException("Incompatible entity type: " + getId()); } + // set config to null + setConfigs(null); } @XmlElement(name = "id") @Override public String getId() { - //Flow id schema: user@flow_name(or id)/version/run_id + //Flow id schema: user@flow_name(or id)/run_id String id = super.getId(); if (id == null) { StringBuilder sb = new StringBuilder(); @@ -56,8 +62,6 @@ public class FlowEntity extends HierarchicalTimelineEntity { sb.append('@'); sb.append(getInfo().get(FLOW_NAME_INFO_KEY).toString()); sb.append('/'); - sb.append(getInfo().get(FLOW_VERSION_INFO_KEY).toString()); - sb.append('/'); sb.append(getInfo().get(FLOW_RUN_ID_INFO_KEY).toString()); id = sb.toString(); setId(id); @@ -66,8 +70,7 @@ public class FlowEntity extends HierarchicalTimelineEntity { } public String getUser() { - Object user = getInfo().get(USER_INFO_KEY); - return user == null ? null : user.toString(); + return (String)getInfo().get(USER_INFO_KEY); } public void setUser(String user) { @@ -75,8 +78,7 @@ public class FlowEntity extends HierarchicalTimelineEntity { } public String getName() { - Object name = getInfo().get(FLOW_NAME_INFO_KEY); - return name == null ? null : name.toString(); + return (String)getInfo().get(FLOW_NAME_INFO_KEY); } public void setName(String name) { @@ -84,8 +86,7 @@ public class FlowEntity extends HierarchicalTimelineEntity { } public String getVersion() { - Object version = getInfo().get(FLOW_VERSION_INFO_KEY); - return version == null ? null : version.toString(); + return (String)getInfo().get(FLOW_VERSION_INFO_KEY); } public void setVersion(String version) { @@ -100,4 +101,21 @@ public class FlowEntity extends HierarchicalTimelineEntity { public void setRunId(long runId) { addInfo(FLOW_RUN_ID_INFO_KEY, runId); } + + public long getStartTime() { + return getCreatedTime(); + } + + public void setStartTime(long startTime) { + setCreatedTime(startTime); + } + + public long getMaxEndTime() { + Object time = getInfo().get(FLOW_RUN_END_TIME); + return time == null ? 0L : (Long)time; + } + + public void setMaxEndTime(long endTime) { + addInfo(FLOW_RUN_END_TIME, endTime); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java index 6062fe19bc4..ba32e20bd86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java @@ -24,21 +24,25 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Unstable public enum TimelineEntityType { YARN_CLUSTER, - YARN_FLOW, + YARN_FLOW_RUN, YARN_APPLICATION, YARN_APPLICATION_ATTEMPT, YARN_CONTAINER, YARN_USER, - YARN_QUEUE; + YARN_QUEUE, + YARN_FLOW_ACTIVITY; + /** + * Whether the input type can be a parent of this entity. + */ public boolean isParent(TimelineEntityType type) { switch (this) { case YARN_CLUSTER: return false; - case YARN_FLOW: - return YARN_FLOW == type || YARN_CLUSTER == type; + case YARN_FLOW_RUN: + return YARN_FLOW_RUN == type || YARN_CLUSTER == type; case YARN_APPLICATION: - return YARN_FLOW == type || YARN_CLUSTER == type; + return YARN_FLOW_RUN == type || YARN_CLUSTER == type; case YARN_APPLICATION_ATTEMPT: return YARN_APPLICATION == type; case YARN_CONTAINER: @@ -50,12 +54,15 @@ public enum TimelineEntityType { } } + /** + * Whether the input type can be a child of this entity. + */ public boolean isChild(TimelineEntityType type) { switch (this) { case YARN_CLUSTER: - return YARN_FLOW == type || YARN_APPLICATION == type; - case YARN_FLOW: - return YARN_FLOW == type || YARN_APPLICATION == type; + return YARN_FLOW_RUN == type || YARN_APPLICATION == type; + case YARN_FLOW_RUN: + return YARN_FLOW_RUN == type || YARN_APPLICATION == type; case YARN_APPLICATION: return YARN_APPLICATION_ATTEMPT == type; case YARN_APPLICATION_ATTEMPT: @@ -68,4 +75,12 @@ public enum TimelineEntityType { return false; } } + + /** + * Whether the type of this entity matches the type indicated by the input + * argument. + */ + public boolean matches(String typeString) { + return toString().equals(typeString); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java index 78943e0558b..7c9acf281c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java @@ -182,14 +182,14 @@ public class TestTimelineServiceRecords { ClusterEntity cluster = new ClusterEntity(); cluster.setId("test cluster id"); - FlowEntity flow1 = new FlowEntity(); + FlowRunEntity flow1 = new FlowRunEntity(); //flow1.setId("test flow id 1"); flow1.setUser(user.getId()); flow1.setName("test flow name 1"); flow1.setVersion("test flow version 1"); flow1.setRunId(1L); - FlowEntity flow2 = new FlowEntity(); + FlowRunEntity flow2 = new FlowRunEntity(); //flow2.setId("test flow run id 2"); flow2.setUser(user.getId()); flow2.setName("test flow name 2"); @@ -213,19 +213,19 @@ public class TestTimelineServiceRecords { ApplicationAttemptId.newInstance( ApplicationId.newInstance(0, 1), 1), 1).toString()); - cluster.addChild(TimelineEntityType.YARN_FLOW.toString(), flow1.getId()); + cluster.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), flow1.getId()); flow1 .setParent(TimelineEntityType.YARN_CLUSTER.toString(), cluster.getId()); - flow1.addChild(TimelineEntityType.YARN_FLOW.toString(), flow2.getId()); - flow2.setParent(TimelineEntityType.YARN_FLOW.toString(), flow1.getId()); + flow1.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId()); + flow2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow1.getId()); flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), app1.getId()); flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), app2.getId()); - app1.setParent(TimelineEntityType.YARN_FLOW.toString(), flow2.getId()); + app1.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId()); app1.addChild(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), appAttempt.getId()); appAttempt .setParent(TimelineEntityType.YARN_APPLICATION.toString(), app1.getId()); - app2.setParent(TimelineEntityType.YARN_FLOW.toString(), flow2.getId()); + app2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId()); appAttempt.addChild(TimelineEntityType.YARN_CONTAINER.toString(), container.getId()); container.setParent(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index 69031a25d04..5672759a1ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -107,7 +107,7 @@ public class TestTimelineServiceClientIntegration { client.start(); ClusterEntity cluster = new ClusterEntity(); cluster.setId(YarnConfiguration.DEFAULT_RM_CLUSTER_ID); - FlowEntity flow = new FlowEntity(); + FlowRunEntity flow = new FlowRunEntity(); flow.setUser(UserGroupInformation.getCurrentUser().getShortUserName()); flow.setName("test_flow_name"); flow.setVersion("test_flow_version"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index 42fa365a7a7..8f595e25b13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEnti import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ClusterEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -205,8 +205,8 @@ public class TimelineCollectorWebService { case YARN_CLUSTER: entitiesToReturn.addEntity(new ClusterEntity(entity)); break; - case YARN_FLOW: - entitiesToReturn.addEntity(new FlowEntity(entity)); + case YARN_FLOW_RUN: + entitiesToReturn.addEntity(new FlowRunEntity(entity)); break; case YARN_APPLICATION: entitiesToReturn.addEntity(new ApplicationEntity(entity)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java new file mode 100644 index 00000000000..dfbc31d4ecc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; + +/** + * Timeline entity reader for application entities that are stored in the + * application table. + */ +class ApplicationEntityReader extends GenericEntityReader { + private static final ApplicationTable APPLICATION_TABLE = + new ApplicationTable(); + + public ApplicationEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, fieldsToRetrieve); + } + + public ApplicationEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, + fieldsToRetrieve); + } + + /** + * Uses the {@link ApplicationTable}. + */ + protected BaseTable getTable() { + return APPLICATION_TABLE; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn) + throws IOException { + byte[] rowKey = + ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId, + appId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + return table.getResult(hbaseConf, conn, get); + } + + @Override + protected Iterable getResults(Configuration hbaseConf, + Connection conn) throws IOException { + // If getEntities() is called for an application, there can be at most + // one entity. If the entity passes the filter, it is returned. Otherwise, + // an empty set is returned. + byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId, + flowRunId, appId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + Result result = table.getResult(hbaseConf, conn, get); + TimelineEntity entity = parseEntity(result); + Set set; + if (entity != null) { + set = Collections.singleton(result); + } else { + set = Collections.emptySet(); + } + return set; + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); + String entityId = ApplicationColumn.ID.readResult(result).toString(); + entity.setId(entityId); + + // fetch created time + Number createdTime = + (Number)ApplicationColumn.CREATED_TIME.readResult(result); + entity.setCreatedTime(createdTime.longValue()); + if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || + entity.getCreatedTime() > createdTimeEnd)) { + return null; + } + + // fetch modified time + Number modifiedTime = + (Number)ApplicationColumn.MODIFIED_TIME.readResult(result); + entity.setModifiedTime(modifiedTime.longValue()); + if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin || + entity.getModifiedTime() > modifiedTimeEnd)) { + return null; + } + + // fetch is related to entities + boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, + true); + if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations( + entity.getIsRelatedToEntities(), isRelatedTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { + entity.getIsRelatedToEntities().clear(); + } + } + + // fetch relates to entities + boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { + readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, + false); + if (checkRelatesTo && !TimelineReaderUtils.matchRelations( + entity.getRelatesToEntities(), relatesTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.RELATES_TO)) { + entity.getRelatesToEntities().clear(); + } + } + + // fetch info + boolean checkInfo = infoFilters != null && infoFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.INFO) || checkInfo) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); + if (checkInfo && + !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.INFO)) { + entity.getInfo().clear(); + } + } + + // fetch configs + boolean checkConfigs = configFilters != null && configFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); + if (checkConfigs && !TimelineReaderUtils.matchFilters( + entity.getConfigs(), configFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.CONFIGS)) { + entity.getConfigs().clear(); + } + } + + // fetch events + boolean checkEvents = eventFilters != null && eventFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { + readEvents(entity, result, true); + if (checkEvents && !TimelineReaderUtils.matchEventFilters( + entity.getEvents(), eventFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.EVENTS)) { + entity.getEvents().clear(); + } + } + + // fetch metrics + boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { + readMetrics(entity, result, ApplicationColumnPrefix.METRIC); + if (checkMetrics && !TimelineReaderUtils.matchMetricFilters( + entity.getMetrics(), metricFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.METRICS)) { + entity.getMetrics().clear(); + } + } + return entity; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java new file mode 100644 index 00000000000..d5ece2ed9ed --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for flow activity entities that are stored in the + * flow activity table. + */ +class FlowActivityEntityReader extends TimelineEntityReader { + private static final FlowActivityTable FLOW_ACTIVITY_TABLE = + new FlowActivityTable(); + + public FlowActivityEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, fieldsToRetrieve); + } + + public FlowActivityEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, + fieldsToRetrieve); + } + + /** + * Uses the {@link FlowActivityTable}. + */ + @Override + protected BaseTable getTable() { + return FLOW_ACTIVITY_TABLE; + } + + /** + * Since this is strictly sorted by the row key, it is sufficient to collect + * the first results as specified by the limit. + */ + @Override + public Set readEntities(Configuration hbaseConf, + Connection conn) throws IOException { + validateParams(); + augmentParams(hbaseConf, conn); + + NavigableSet entities = new TreeSet<>(); + Iterable results = getResults(hbaseConf, conn); + for (Result result : results) { + TimelineEntity entity = parseEntity(result); + if (entity == null) { + continue; + } + entities.add(entity); + if (entities.size() == limit) { + break; + } + } + return entities; + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn) + throws IOException { + throw new UnsupportedOperationException( + "we don't support a single entity query"); + } + + @Override + protected Iterable getResults(Configuration hbaseConf, + Connection conn) throws IOException { + Scan scan = new Scan(); + scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId)); + // use the page filter to limit the result to the page size + // the scanner may still return more than the limit; therefore we need to + // read the right number as we iterate + scan.setFilter(new PageFilter(limit)); + return table.getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow()); + + long time = rowKey.getDayTimestamp(); + String user = rowKey.getUserId(); + String flowName = rowKey.getFlowId(); + + FlowActivityEntity flowActivity = + new FlowActivityEntity(clusterId, time, user, flowName); + // set the id + flowActivity.setId(flowActivity.getId()); + // get the list of run ids along with the version that are associated with + // this flow on this day + Map runIdsMap = + FlowActivityColumnPrefix.RUN_ID.readResults(result); + for (Map.Entry e : runIdsMap.entrySet()) { + Long runId = Long.valueOf(e.getKey()); + String version = (String)e.getValue(); + FlowRunEntity flowRun = new FlowRunEntity(); + flowRun.setUser(user); + flowRun.setName(flowName); + flowRun.setRunId(runId); + flowRun.setVersion(version); + // set the id + flowRun.setId(flowRun.getId()); + flowActivity.addFlowRun(flowRun); + } + + return flowActivity; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java new file mode 100644 index 00000000000..ced795db492 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for flow run entities that are stored in the flow run + * table. + */ +class FlowRunEntityReader extends TimelineEntityReader { + private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable(); + + public FlowRunEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, fieldsToRetrieve); + } + + public FlowRunEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, + fieldsToRetrieve); + } + + /** + * Uses the {@link FlowRunTable}. + */ + @Override + protected BaseTable getTable() { + return FLOW_RUN_TABLE; + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(userId, "userId shouldn't be null"); + Preconditions.checkNotNull(flowId, "flowId shouldn't be null"); + Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null"); + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) { + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn) + throws IOException { + byte[] rowKey = + FlowRunRowKey.getRowKey(clusterId, userId, flowId, flowRunId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + return table.getResult(hbaseConf, conn, get); + } + + @Override + protected Iterable getResults(Configuration hbaseConf, + Connection conn) throws IOException { + throw new UnsupportedOperationException( + "multiple entity query is not supported"); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + FlowRunEntity flowRun = new FlowRunEntity(); + flowRun.setUser(userId); + flowRun.setName(flowId); + flowRun.setRunId(flowRunId); + + // read the start time + Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result); + if (startTime != null) { + flowRun.setStartTime(startTime); + } + // read the end time if available + Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result); + if (endTime != null) { + flowRun.setMaxEndTime(endTime); + } + + // read the flow version + String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result); + if (version != null) { + flowRun.setVersion(version); + } + + // read metrics + readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); + + // set the id + flowRun.setId(flowRun.getId()); + return flowRun; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java new file mode 100644 index 00000000000..466914b2059 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java @@ -0,0 +1,389 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for generic entities that are stored in the entity + * table. + */ +class GenericEntityReader extends TimelineEntityReader { + private static final EntityTable ENTITY_TABLE = new EntityTable(); + private static final Log LOG = LogFactory.getLog(GenericEntityReader.class); + + private static final long DEFAULT_BEGIN_TIME = 0L; + private static final long DEFAULT_END_TIME = Long.MAX_VALUE; + + /** + * Used to look up the flow context. + */ + private final AppToFlowTable appToFlowTable = new AppToFlowTable(); + + public GenericEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, fieldsToRetrieve); + } + + public GenericEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, + fieldsToRetrieve); + } + + /** + * Uses the {@link EntityTable}. + */ + protected BaseTable getTable() { + return ENTITY_TABLE; + } + + private FlowContext lookupFlowContext(String clusterId, String appId, + Configuration hbaseConf, Connection conn) throws IOException { + byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); + Get get = new Get(rowKey); + Result result = appToFlowTable.getResult(hbaseConf, conn, get); + if (result != null && !result.isEmpty()) { + return new FlowContext( + AppToFlowColumn.FLOW_ID.readResult(result).toString(), + ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); + } else { + throw new IOException( + "Unable to find the context flow ID and flow run ID for clusterId=" + + clusterId + ", appId=" + appId); + } + } + + private static class FlowContext { + private final String flowId; + private final Long flowRunId; + public FlowContext(String flowId, Long flowRunId) { + this.flowId = flowId; + this.flowRunId = flowRunId; + } + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(userId, "userId shouldn't be null"); + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(appId, "appId shouldn't be null"); + Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); + if (singleEntityRead) { + Preconditions.checkNotNull(entityId, "entityId shouldn't be null"); + } + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + // In reality both should be null or neither should be null + if (flowId == null || flowRunId == null) { + FlowContext context = + lookupFlowContext(clusterId, appId, hbaseConf, conn); + flowId = context.flowId; + flowRunId = context.flowRunId; + } + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + if (!singleEntityRead) { + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = DEFAULT_BEGIN_TIME; + } + if (createdTimeEnd == null) { + createdTimeEnd = DEFAULT_END_TIME; + } + if (modifiedTimeBegin == null) { + modifiedTimeBegin = DEFAULT_BEGIN_TIME; + } + if (modifiedTimeEnd == null) { + modifiedTimeEnd = DEFAULT_END_TIME; + } + } + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn) + throws IOException { + byte[] rowKey = + EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId, + entityType, entityId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + return table.getResult(hbaseConf, conn, get); + } + + @Override + protected Iterable getResults(Configuration hbaseConf, + Connection conn) throws IOException { + // Scan through part of the table to find the entities belong to one app + // and one type + Scan scan = new Scan(); + scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( + clusterId, userId, flowId, flowRunId, appId, entityType)); + scan.setMaxVersions(Integer.MAX_VALUE); + return table.getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + String entityType = EntityColumn.TYPE.readResult(result).toString(); + entity.setType(entityType); + String entityId = EntityColumn.ID.readResult(result).toString(); + entity.setId(entityId); + + // fetch created time + Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result); + entity.setCreatedTime(createdTime.longValue()); + if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || + entity.getCreatedTime() > createdTimeEnd)) { + return null; + } + + // fetch modified time + Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result); + entity.setModifiedTime(modifiedTime.longValue()); + if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin || + entity.getModifiedTime() > modifiedTimeEnd)) { + return null; + } + + // fetch is related to entities + boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); + if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations( + entity.getIsRelatedToEntities(), isRelatedTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { + entity.getIsRelatedToEntities().clear(); + } + } + + // fetch relates to entities + boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { + readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); + if (checkRelatesTo && !TimelineReaderUtils.matchRelations( + entity.getRelatesToEntities(), relatesTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.RELATES_TO)) { + entity.getRelatesToEntities().clear(); + } + } + + // fetch info + boolean checkInfo = infoFilters != null && infoFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.INFO) || checkInfo) { + readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); + if (checkInfo && + !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.INFO)) { + entity.getInfo().clear(); + } + } + + // fetch configs + boolean checkConfigs = configFilters != null && configFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { + readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); + if (checkConfigs && !TimelineReaderUtils.matchFilters( + entity.getConfigs(), configFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.CONFIGS)) { + entity.getConfigs().clear(); + } + } + + // fetch events + boolean checkEvents = eventFilters != null && eventFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { + readEvents(entity, result, false); + if (checkEvents && !TimelineReaderUtils.matchEventFilters( + entity.getEvents(), eventFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.EVENTS)) { + entity.getEvents().clear(); + } + } + + // fetch metrics + boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { + readMetrics(entity, result, EntityColumnPrefix.METRIC); + if (checkMetrics && !TimelineReaderUtils.matchMetricFilters( + entity.getMetrics(), metricFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.METRICS)) { + entity.getMetrics().clear(); + } + } + return entity; + } + + /** + * Helper method for reading relationship. + */ + protected void readRelationship( + TimelineEntity entity, Result result, ColumnPrefix prefix, + boolean isRelatedTo) throws IOException { + // isRelatedTo and relatesTo are of type Map> + Map columns = prefix.readResults(result); + for (Map.Entry column : columns.entrySet()) { + for (String id : Separator.VALUES.splitEncoded( + column.getValue().toString())) { + if (isRelatedTo) { + entity.addIsRelatedToEntity(column.getKey(), id); + } else { + entity.addRelatesToEntity(column.getKey(), id); + } + } + } + } + + /** + * Helper method for reading key-value pairs for either info or config. + */ + protected void readKeyValuePairs( + TimelineEntity entity, Result result, ColumnPrefix prefix, + boolean isConfig) throws IOException { + // info and configuration are of type Map + Map columns = prefix.readResults(result); + if (isConfig) { + for (Map.Entry column : columns.entrySet()) { + entity.addConfig(column.getKey(), column.getValue().toString()); + } + } else { + entity.addInfo(columns); + } + } + + /** + * Read events from the entity table or the application table. The column name + * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted + * if there is no info associated with the event. + * + * See {@link EntityTable} and {@link ApplicationTable} for a more detailed + * schema description. + */ + protected void readEvents(TimelineEntity entity, Result result, + boolean isApplication) throws IOException { + Map eventsMap = new HashMap<>(); + Map eventsResult = isApplication ? + ApplicationColumnPrefix.EVENT. + readResultsHavingCompoundColumnQualifiers(result) : + EntityColumnPrefix.EVENT. + readResultsHavingCompoundColumnQualifiers(result); + for (Map.Entry eventResult : eventsResult.entrySet()) { + byte[][] karr = (byte[][])eventResult.getKey(); + // the column name is of the form "eventId=timestamp=infoKey" + if (karr.length == 3) { + String id = Bytes.toString(karr[0]); + long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1])); + String key = Separator.VALUES.joinEncoded(id, Long.toString(ts)); + TimelineEvent event = eventsMap.get(key); + if (event == null) { + event = new TimelineEvent(); + event.setId(id); + event.setTimestamp(ts); + eventsMap.put(key, event); + } + // handle empty info + String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]); + if (infoKey != null) { + event.addInfo(infoKey, eventResult.getValue()); + } + } else { + LOG.warn("incorrectly formatted column name: it will be discarded"); + continue; + } + } + Set eventsSet = new HashSet<>(eventsMap.values()); + entity.addEvents(eventsSet); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index c514c2080c3..889ae191612 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -20,13 +20,8 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; import java.io.IOException; import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.NavigableMap; -import java.util.NavigableSet; import java.util.Set; -import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,47 +29,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; - -import com.google.common.base.Preconditions; public class HBaseTimelineReaderImpl extends AbstractService implements TimelineReader { private static final Log LOG = LogFactory .getLog(HBaseTimelineReaderImpl.class); - private static final long DEFAULT_BEGIN_TIME = 0L; - private static final long DEFAULT_END_TIME = Long.MAX_VALUE; private Configuration hbaseConf = null; private Connection conn; - private EntityTable entityTable; - private AppToFlowTable appToFlowTable; - private ApplicationTable applicationTable; public HBaseTimelineReaderImpl() { super(HBaseTimelineReaderImpl.class.getName()); @@ -85,9 +50,6 @@ public class HBaseTimelineReaderImpl super.serviceInit(conf); hbaseConf = HBaseConfiguration.create(conf); conn = ConnectionFactory.createConnection(hbaseConf); - entityTable = new EntityTable(); - appToFlowTable = new AppToFlowTable(); - applicationTable = new ApplicationTable(); } @Override @@ -104,35 +66,10 @@ public class HBaseTimelineReaderImpl String flowId, Long flowRunId, String appId, String entityType, String entityId, EnumSet fieldsToRetrieve) throws IOException { - validateParams(userId, clusterId, appId, entityType, entityId, true); - // In reality both should be null or neither should be null - if (flowId == null || flowRunId == null) { - FlowContext context = lookupFlowContext(clusterId, appId); - flowId = context.flowId; - flowRunId = context.flowRunId; - } - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.noneOf(Field.class); - } - - boolean isApplication = isApplicationEntity(entityType); - byte[] rowKey = isApplication ? - ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId, - appId) : - EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId, - entityType, entityId); - Get get = new Get(rowKey); - get.setMaxVersions(Integer.MAX_VALUE); - Result result = isApplication ? - applicationTable.getResult(hbaseConf, conn, get) : - entityTable.getResult(hbaseConf, conn, get); - return parseEntity(result, fieldsToRetrieve, - false, DEFAULT_BEGIN_TIME, DEFAULT_END_TIME, false, DEFAULT_BEGIN_TIME, - DEFAULT_END_TIME, null, null, null, null, null, null, isApplication); - } - - private static boolean isApplicationEntity(String entityType) { - return TimelineEntityType.YARN_APPLICATION.toString().equals(entityType); + TimelineEntityReader reader = + TimelineEntityReaderFactory.createSingleEntityReader(userId, clusterId, + flowId, flowRunId, appId, entityType, entityId, fieldsToRetrieve); + return reader.readEntity(hbaseConf, conn); } @Override @@ -144,361 +81,12 @@ public class HBaseTimelineReaderImpl Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, EnumSet fieldsToRetrieve) throws IOException { - validateParams(userId, clusterId, appId, entityType, null, false); - // In reality both should be null or neither should be null - if (flowId == null || flowRunId == null) { - FlowContext context = lookupFlowContext(clusterId, appId); - flowId = context.flowId; - flowRunId = context.flowRunId; - } - if (limit == null) { - limit = TimelineReader.DEFAULT_LIMIT; - } - if (createdTimeBegin == null) { - createdTimeBegin = DEFAULT_BEGIN_TIME; - } - if (createdTimeEnd == null) { - createdTimeEnd = DEFAULT_END_TIME; - } - if (modifiedTimeBegin == null) { - modifiedTimeBegin = DEFAULT_BEGIN_TIME; - } - if (modifiedTimeEnd == null) { - modifiedTimeEnd = DEFAULT_END_TIME; - } - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.noneOf(Field.class); - } - - NavigableSet entities = new TreeSet<>(); - boolean isApplication = isApplicationEntity(entityType); - if (isApplication) { - // If getEntities() is called for an application, there can be at most - // one entity. If the entity passes the filter, it is returned. Otherwise, - // an empty set is returned. - byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId, - flowRunId, appId); - Get get = new Get(rowKey); - get.setMaxVersions(Integer.MAX_VALUE); - Result result = applicationTable.getResult(hbaseConf, conn, get); - TimelineEntity entity = parseEntity(result, fieldsToRetrieve, - true, createdTimeBegin, createdTimeEnd, true, modifiedTimeBegin, - modifiedTimeEnd, isRelatedTo, relatesTo, infoFilters, configFilters, - eventFilters, metricFilters, isApplication); - if (entity != null) { - entities.add(entity); - } - } else { - // Scan through part of the table to find the entities belong to one app - // and one type - Scan scan = new Scan(); - scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( - clusterId, userId, flowId, flowRunId, appId, entityType)); - scan.setMaxVersions(Integer.MAX_VALUE); - ResultScanner scanner = - entityTable.getResultScanner(hbaseConf, conn, scan); - for (Result result : scanner) { - TimelineEntity entity = parseEntity(result, fieldsToRetrieve, - true, createdTimeBegin, createdTimeEnd, - true, modifiedTimeBegin, modifiedTimeEnd, - isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters, - metricFilters, isApplication); - if (entity == null) { - continue; - } - if (entities.size() > limit) { - entities.pollLast(); - } - entities.add(entity); - } - } - return entities; - } - - private FlowContext lookupFlowContext(String clusterId, String appId) - throws IOException { - byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); - Get get = new Get(rowKey); - Result result = appToFlowTable.getResult(hbaseConf, conn, get); - if (result != null && !result.isEmpty()) { - return new FlowContext( - AppToFlowColumn.FLOW_ID.readResult(result).toString(), - ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); - } else { - throw new IOException( - "Unable to find the context flow ID and flow run ID for clusterId=" + - clusterId + ", appId=" + appId); - } - } - - private static class FlowContext { - private String flowId; - private Long flowRunId; - public FlowContext(String flowId, Long flowRunId) { - this.flowId = flowId; - this.flowRunId = flowRunId; - } - } - - private static void validateParams(String userId, String clusterId, - String appId, String entityType, String entityId, boolean checkEntityId) { - Preconditions.checkNotNull(userId, "userId shouldn't be null"); - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - Preconditions.checkNotNull(appId, "appId shouldn't be null"); - Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); - if (checkEntityId) { - Preconditions.checkNotNull(entityId, "entityId shouldn't be null"); - } - } - - private static TimelineEntity parseEntity( - Result result, EnumSet fieldsToRetrieve, - boolean checkCreatedTime, long createdTimeBegin, long createdTimeEnd, - boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd, - Map> isRelatedTo, Map> relatesTo, - Map infoFilters, Map configFilters, - Set eventFilters, Set metricFilters, - boolean isApplication) - throws IOException { - if (result == null || result.isEmpty()) { - return null; - } - TimelineEntity entity = new TimelineEntity(); - String entityType = isApplication ? - TimelineEntityType.YARN_APPLICATION.toString() : - EntityColumn.TYPE.readResult(result).toString(); - entity.setType(entityType); - String entityId = isApplication ? - ApplicationColumn.ID.readResult(result).toString() : - EntityColumn.ID.readResult(result).toString(); - entity.setId(entityId); - - // fetch created time - Number createdTime = isApplication ? - (Number)ApplicationColumn.CREATED_TIME.readResult(result) : - (Number)EntityColumn.CREATED_TIME.readResult(result); - entity.setCreatedTime(createdTime.longValue()); - if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin || - entity.getCreatedTime() > createdTimeEnd)) { - return null; - } - - // fetch modified time - Number modifiedTime = isApplication ? - (Number)ApplicationColumn.MODIFIED_TIME.readResult(result) : - (Number)EntityColumn.MODIFIED_TIME.readResult(result); - entity.setModifiedTime(modifiedTime.longValue()); - if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin || - entity.getModifiedTime() > modifiedTimeEnd)) { - return null; - } - - // fetch is related to entities - boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { - if (isApplication) { - readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, - true); - } else { - readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, - true); - } - if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations( - entity.getIsRelatedToEntities(), isRelatedTo)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { - entity.getIsRelatedToEntities().clear(); - } - } - - // fetch relates to entities - boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { - if (isApplication) { - readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, - false); - } else { - readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); - } - if (checkRelatesTo && !TimelineReaderUtils.matchRelations( - entity.getRelatesToEntities(), relatesTo)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.RELATES_TO)) { - entity.getRelatesToEntities().clear(); - } - } - - // fetch info - boolean checkInfo = infoFilters != null && infoFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.INFO) || checkInfo) { - if (isApplication) { - readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); - } else { - readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); - } - if (checkInfo && - !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.INFO)) { - entity.getInfo().clear(); - } - } - - // fetch configs - boolean checkConfigs = configFilters != null && configFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { - if (isApplication) { - readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); - } else { - readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); - } - if (checkConfigs && !TimelineReaderUtils.matchFilters( - entity.getConfigs(), configFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.CONFIGS)) { - entity.getConfigs().clear(); - } - } - - // fetch events - boolean checkEvents = eventFilters != null && eventFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { - readEvents(entity, result, isApplication); - if (checkEvents && !TimelineReaderUtils.matchEventFilters( - entity.getEvents(), eventFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.EVENTS)) { - entity.getEvents().clear(); - } - } - - // fetch metrics - boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { - readMetrics(entity, result, isApplication); - if (checkMetrics && !TimelineReaderUtils.matchMetricFilters( - entity.getMetrics(), metricFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.METRICS)) { - entity.getMetrics().clear(); - } - } - return entity; - } - - private static void readRelationship( - TimelineEntity entity, Result result, ColumnPrefix prefix, - boolean isRelatedTo) throws IOException { - // isRelatedTo and relatesTo are of type Map> - Map columns = prefix.readResults(result); - for (Map.Entry column : columns.entrySet()) { - for (String id : Separator.VALUES.splitEncoded( - column.getValue().toString())) { - if (isRelatedTo) { - entity.addIsRelatedToEntity(column.getKey(), id); - } else { - entity.addRelatesToEntity(column.getKey(), id); - } - } - } - } - - private static void readKeyValuePairs( - TimelineEntity entity, Result result, ColumnPrefix prefix, - boolean isConfig) throws IOException { - // info and configuration are of type Map - Map columns = prefix.readResults(result); - if (isConfig) { - for (Map.Entry column : columns.entrySet()) { - entity.addConfig(column.getKey(), column.getValue().toString()); - } - } else { - entity.addInfo(columns); - } - } - - /** - * Read events from the entity table or the application table. The column name - * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted - * if there is no info associated with the event. - * - * See {@link EntityTable} and {@link ApplicationTable} for a more detailed - * schema description. - */ - private static void readEvents(TimelineEntity entity, Result result, - boolean isApplication) throws IOException { - Map eventsMap = new HashMap<>(); - Map eventsResult = isApplication ? - ApplicationColumnPrefix.EVENT. - readResultsHavingCompoundColumnQualifiers(result) : - EntityColumnPrefix.EVENT. - readResultsHavingCompoundColumnQualifiers(result); - for (Map.Entry eventResult : eventsResult.entrySet()) { - byte[][] karr = (byte[][])eventResult.getKey(); - // the column name is of the form "eventId=timestamp=infoKey" - if (karr.length == 3) { - String id = Bytes.toString(karr[0]); - long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1])); - String key = Separator.VALUES.joinEncoded(id, Long.toString(ts)); - TimelineEvent event = eventsMap.get(key); - if (event == null) { - event = new TimelineEvent(); - event.setId(id); - event.setTimestamp(ts); - eventsMap.put(key, event); - } - // handle empty info - String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]); - if (infoKey != null) { - event.addInfo(infoKey, eventResult.getValue()); - } - } else { - LOG.warn("incorrectly formatted column name: it will be discarded"); - continue; - } - } - Set eventsSet = new HashSet<>(eventsMap.values()); - entity.addEvents(eventsSet); - } - - private static void readMetrics(TimelineEntity entity, Result result, - boolean isApplication) throws IOException { - NavigableMap> metricsResult; - if (isApplication) { - metricsResult = - ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result); - } else { - metricsResult = - EntityColumnPrefix.METRIC.readResultsWithTimestamps(result); - } - for (Map.Entry> metricResult: - metricsResult.entrySet()) { - TimelineMetric metric = new TimelineMetric(); - metric.setId(metricResult.getKey()); - // Simply assume that if the value set contains more than 1 elements, the - // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric - metric.setType(metricResult.getValue().size() > 1 ? - TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE); - metric.addValues(metricResult.getValue()); - entity.addMetric(metric); - } + TimelineEntityReader reader = + TimelineEntityReaderFactory.createMultipleEntitiesReader(userId, + clusterId, flowId, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, + modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, + metricFilters, eventFilters, fieldsToRetrieve); + return reader.readEntities(hbaseConf, conn); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java new file mode 100644 index 00000000000..0d1134c8d53 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; + +/** + * The base class for reading and deserializing timeline entities from the + * HBase storage. Different types can be defined for different types of the + * entities that are being requested. + */ +abstract class TimelineEntityReader { + protected final boolean singleEntityRead; + + protected String userId; + protected String clusterId; + protected String flowId; + protected Long flowRunId; + protected String appId; + protected String entityType; + protected EnumSet fieldsToRetrieve; + // used only for a single entity read mode + protected String entityId; + // used only for multiple entity read mode + protected Long limit; + protected Long createdTimeBegin; + protected Long createdTimeEnd; + protected Long modifiedTimeBegin; + protected Long modifiedTimeEnd; + protected Map> relatesTo; + protected Map> isRelatedTo; + protected Map infoFilters; + protected Map configFilters; + protected Set metricFilters; + protected Set eventFilters; + + /** + * Main table the entity reader uses. + */ + protected BaseTable table; + + /** + * Instantiates a reader for multiple-entity reads. + */ + protected TimelineEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve) { + this.singleEntityRead = false; + this.userId = userId; + this.clusterId = clusterId; + this.flowId = flowId; + this.flowRunId = flowRunId; + this.appId = appId; + this.entityType = entityType; + this.fieldsToRetrieve = fieldsToRetrieve; + this.limit = limit; + this.createdTimeBegin = createdTimeBegin; + this.createdTimeEnd = createdTimeEnd; + this.modifiedTimeBegin = modifiedTimeBegin; + this.modifiedTimeEnd = modifiedTimeEnd; + this.relatesTo = relatesTo; + this.isRelatedTo = isRelatedTo; + this.infoFilters = infoFilters; + this.configFilters = configFilters; + this.metricFilters = metricFilters; + this.eventFilters = eventFilters; + + this.table = getTable(); + } + + /** + * Instantiates a reader for single-entity reads. + */ + protected TimelineEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet fieldsToRetrieve) { + this.singleEntityRead = true; + this.userId = userId; + this.clusterId = clusterId; + this.flowId = flowId; + this.flowRunId = flowRunId; + this.appId = appId; + this.entityType = entityType; + this.fieldsToRetrieve = fieldsToRetrieve; + this.entityId = entityId; + + this.table = getTable(); + } + + /** + * Reads and deserializes a single timeline entity from the HBase storage. + */ + public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) + throws IOException { + validateParams(); + augmentParams(hbaseConf, conn); + + Result result = getResult(hbaseConf, conn); + return parseEntity(result); + } + + /** + * Reads and deserializes a set of timeline entities from the HBase storage. + * It goes through all the results available, and returns the number of + * entries as specified in the limit in the entity's natural sort order. + */ + public Set readEntities(Configuration hbaseConf, + Connection conn) throws IOException { + validateParams(); + augmentParams(hbaseConf, conn); + + NavigableSet entities = new TreeSet<>(); + Iterable results = getResults(hbaseConf, conn); + for (Result result : results) { + TimelineEntity entity = parseEntity(result); + if (entity == null) { + continue; + } + entities.add(entity); + if (entities.size() > limit) { + entities.pollLast(); + } + } + return entities; + } + + /** + * Returns the main table to be used by the entity reader. + */ + protected abstract BaseTable getTable(); + + /** + * Validates the required parameters to read the entities. + */ + protected abstract void validateParams(); + + /** + * Sets certain parameters to defaults if the values are not provided. + */ + protected abstract void augmentParams(Configuration hbaseConf, + Connection conn) throws IOException; + + /** + * Fetches a {@link Result} instance for a single-entity read. + * + * @return the {@link Result} instance or null if no such record is found. + */ + protected abstract Result getResult(Configuration hbaseConf, Connection conn) + throws IOException; + + /** + * Fetches an iterator for {@link Result} instances for a multi-entity read. + */ + protected abstract Iterable getResults(Configuration hbaseConf, + Connection conn) throws IOException; + + /** + * Given a {@link Result} instance, deserializes and creates a + * {@link TimelineEntity}. + * + * @return the {@link TimelineEntity} instance, or null if the {@link Result} + * is null or empty. + */ + protected abstract TimelineEntity parseEntity(Result result) + throws IOException; + + /** + * Helper method for reading and deserializing {@link TimelineMetric} objects + * using the specified column prefix. The timeline metrics then are added to + * the given timeline entity. + */ + protected void readMetrics(TimelineEntity entity, Result result, + ColumnPrefix columnPrefix) throws IOException { + NavigableMap> metricsResult = + columnPrefix.readResultsWithTimestamps(result); + for (Map.Entry> metricResult: + metricsResult.entrySet()) { + TimelineMetric metric = new TimelineMetric(); + metric.setId(metricResult.getKey()); + // Simply assume that if the value set contains more than 1 elements, the + // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric + metric.setType(metricResult.getValue().size() > 1 ? + TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE); + metric.addValues(metricResult.getValue()); + entity.addMetric(metric); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java new file mode 100644 index 00000000000..4fdef40ebc4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; + +/** + * Factory methods for instantiating a timeline entity reader. + */ +class TimelineEntityReaderFactory { + /** + * Creates a timeline entity reader instance for reading a single entity with + * the specified input. + */ + public static TimelineEntityReader createSingleEntityReader(String userId, + String clusterId, String flowId, Long flowRunId, String appId, + String entityType, String entityId, EnumSet fieldsToRetrieve) { + // currently the types that are handled separate from the generic entity + // table are application, flow run, and flow activity entities + if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { + return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, entityId, fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { + return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, entityId, fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { + return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, entityId, fieldsToRetrieve); + } else { + // assume we're dealing with a generic entity read + return new GenericEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, entityId, fieldsToRetrieve); + } + } + + /** + * Creates a timeline entity reader instance for reading set of entities with + * the specified input and predicates. + */ + public static TimelineEntityReader createMultipleEntitiesReader(String userId, + String clusterId, String flowId, Long flowRunId, String appId, + String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve) { + // currently the types that are handled separate from the generic entity + // table are application, flow run, and flow activity entities + if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { + return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, + fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { + return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, + fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { + return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, + fieldsToRetrieve); + } else { + // assume we're dealing with a generic entity read + return new GenericEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, + fieldsToRetrieve); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java index 5f3868b5503..e3b5a87ecdb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java @@ -19,14 +19,46 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.application; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; /** * Represents a rowkey for the application table. */ public class ApplicationRowKey { - // TODO: more methods are needed for this class. + private final String clusterId; + private final String userId; + private final String flowId; + private final long flowRunId; + private final String appId; - // TODO: API needs to be cleaned up. + public ApplicationRowKey(String clusterId, String userId, String flowId, + long flowRunId, String appId) { + this.clusterId = clusterId; + this.userId = userId; + this.flowId = flowId; + this.flowRunId = flowRunId; + this.appId = appId; + } + + public String getClusterId() { + return clusterId; + } + + public String getUserId() { + return userId; + } + + public String getFlowId() { + return flowId; + } + + public long getFlowRunId() { + return flowRunId; + } + + public String getAppId() { + return appId; + } /** * Constructs a row key for the application table as follows: @@ -46,22 +78,32 @@ public class ApplicationRowKey { flowId)); // Note that flowRunId is a long, so we can't encode them all at the same // time. - byte[] second = Bytes.toBytes(ApplicationRowKey.invert(flowRunId)); + byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); byte[] third = Bytes.toBytes(appId); return Separator.QUALIFIERS.join(first, second, third); } /** - * Converts a timestamp into its inverse timestamp to be used in (row) keys - * where we want to have the most recent timestamp in the top of the table - * (scans start at the most recent timestamp first). - * - * @param key value to be inverted so that the latest version will be first in - * a scan. - * @return inverted long + * Given the raw row key as bytes, returns the row key as an object. */ - public static long invert(Long key) { - return Long.MAX_VALUE - key; - } + public static ApplicationRowKey parseRowKey(byte[] rowKey) { + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); + if (rowKeyComponents.length < 5) { + throw new IllegalArgumentException("the row key is not valid for " + + "an application"); + } + + String clusterId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0])); + String userId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1])); + String flowId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2])); + long flowRunId = + TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3])); + String appId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4])); + return new ApplicationRowKey(clusterId, userId, flowId, flowRunId, appId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java index ad4fec6fdf9..ca8805662b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java @@ -24,6 +24,22 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; * Represents a rowkey for the app_flow table. */ public class AppToFlowRowKey { + private final String clusterId; + private final String appId; + + public AppToFlowRowKey(String clusterId, String appId) { + this.clusterId = clusterId; + this.appId = appId; + } + + public String getClusterId() { + return clusterId; + } + + public String getAppId() { + return appId; + } + /** * Constructs a row key prefix for the app_flow table as follows: * {@code clusterId!AppId} @@ -36,4 +52,19 @@ public class AppToFlowRowKey { return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId)); } + /** + * Given the raw row key as bytes, returns the row key as an object. + */ + public static AppToFlowRowKey parseRowKey(byte[] rowKey) { + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); + + if (rowKeyComponents.length < 2) { + throw new IllegalArgumentException("the row key is not valid for " + + "the app-to-flow table"); + } + + String clusterId = Bytes.toString(rowKeyComponents[0]); + String appId = Bytes.toString(rowKeyComponents[1]); + return new AppToFlowRowKey(clusterId, appId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java index abba79a56fc..95454382fb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java @@ -31,7 +31,8 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; /** - * Implements behavior common to tables used in the timeline service storage. + * Implements behavior common to tables used in the timeline service storage. It + * is thread-safe, and can be used by multiple threads concurrently. * * @param reference to the table instance class itself for type safety. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index 9a72be04176..6a534ed73c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.entity; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; @@ -26,9 +25,52 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWrit * Represents a rowkey for the entity table. */ public class EntityRowKey { - // TODO: more methods are needed for this class. + private final String clusterId; + private final String userId; + private final String flowId; + private final long flowRunId; + private final String appId; + private final String entityType; + private final String entityId; - // TODO: API needs to be cleaned up. + public EntityRowKey(String clusterId, String userId, String flowId, + long flowRunId, String appId, String entityType, String entityId) { + this.clusterId = clusterId; + this.userId = userId; + this.flowId = flowId; + this.flowRunId = flowRunId; + this.appId = appId; + this.entityType = entityType; + this.entityId = entityId; + } + + public String getClusterId() { + return clusterId; + } + + public String getUserId() { + return userId; + } + + public String getFlowId() { + return flowId; + } + + public long getFlowRunId() { + return flowRunId; + } + + public String getAppId() { + return appId; + } + + public String getEntityType() { + return entityType; + } + + public String getEntityId() { + return entityId; + } /** * Constructs a row key prefix for the entity table as follows: @@ -106,4 +148,32 @@ public class EntityRowKey { return Separator.QUALIFIERS.join(first, second, third); } + /** + * Given the raw row key as bytes, returns the row key as an object. + */ + public static EntityRowKey parseRowKey(byte[] rowKey) { + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); + + if (rowKeyComponents.length < 7) { + throw new IllegalArgumentException("the row key is not valid for " + + "an entity"); + } + + String userId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0])); + String clusterId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1])); + String flowId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2])); + long flowRunId = + TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3])); + String appId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4])); + String entityType = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5])); + String entityId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[6])); + return new EntityRowKey(clusterId, userId, flowId, flowRunId, appId, + entityType, entityId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java index 19e4e83f5bd..18ca5990136 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -55,6 +55,10 @@ public class FlowActivityRowKey { return flowId; } + public static byte[] getRowKeyPrefix(String clusterId) { + return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, "")); + } + /** * Constructs a row key for the flow activity table as follows: * {@code clusterId!dayTimestamp!user!flowId} @@ -65,7 +69,8 @@ public class FlowActivityRowKey { * @param flowId * @return byte array with the row key prefix */ - public static byte[] getRowKey(String clusterId, String userId, String flowId) { + public static byte[] getRowKey(String clusterId, String userId, + String flowId) { long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System .currentTimeMillis()); return getRowKey(clusterId, dayTs, userId, flowId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java index e133241ba1f..880d481acfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java @@ -25,7 +25,34 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWrit * Represents a rowkey for the flow run table. */ public class FlowRunRowKey { - // TODO: more methods are needed for this class like parse row key + private final String clusterId; + private final String userId; + private final String flowId; + private final long flowRunId; + + public FlowRunRowKey(String clusterId, String userId, String flowId, + long flowRunId) { + this.clusterId = clusterId; + this.userId = userId; + this.flowId = flowId; + this.flowRunId = flowRunId; + } + + public String getClusterId() { + return clusterId; + } + + public String getUserId() { + return userId; + } + + public String getFlowId() { + return flowId; + } + + public long getFlowRunId() { + return flowRunId; + } /** * Constructs a row key for the entity table as follows: { @@ -47,4 +74,25 @@ public class FlowRunRowKey { return Separator.QUALIFIERS.join(first, second); } + /** + * Given the raw row key as bytes, returns the row key as an object. + */ + public static FlowRunRowKey parseRowKey(byte[] rowKey) { + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); + + if (rowKeyComponents.length < 4) { + throw new IllegalArgumentException("the row key is not valid for " + + "a flow run"); + } + + String clusterId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0])); + String userId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1])); + String flowId = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2])); + long flowRunId = + TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3])); + return new FlowRunRowKey(clusterId, userId, flowId, flowRunId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index a1948aa76a8..651bb3ab66a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -18,6 +18,15 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; @@ -33,15 +42,6 @@ import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - /** * Invoked via the coprocessor when a Get or a Scan is issued for flow run * table. Looks through the list of cells per row, checks their tags and does diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index 3962341bccf..01920b3603b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -508,32 +508,28 @@ public class TestHBaseTimelineStorage { private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, String flow, long runid, String appName, TimelineEntity te) { - byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); + EntityRowKey key = EntityRowKey.parseRowKey(rowKey); - assertTrue(rowKeyComponents.length == 7); - assertEquals(user, Bytes.toString(rowKeyComponents[0])); - assertEquals(cluster, Bytes.toString(rowKeyComponents[1])); - assertEquals(flow, Bytes.toString(rowKeyComponents[2])); - assertEquals(TimelineWriterUtils.invert(runid), - Bytes.toLong(rowKeyComponents[3])); - assertEquals(appName, Bytes.toString(rowKeyComponents[4])); - assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5])); - assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6])); + assertEquals(user, key.getUserId()); + assertEquals(cluster, key.getClusterId()); + assertEquals(flow, key.getFlowId()); + assertEquals(runid, key.getFlowRunId()); + assertEquals(appName, key.getAppId()); + assertEquals(te.getType(), key.getEntityType()); + assertEquals(te.getId(), key.getEntityId()); return true; } private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster, String user, String flow, long runid, String appName) { - byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); + ApplicationRowKey key = ApplicationRowKey.parseRowKey(rowKey); - assertTrue(rowKeyComponents.length == 5); - assertEquals(cluster, Bytes.toString(rowKeyComponents[0])); - assertEquals(user, Bytes.toString(rowKeyComponents[1])); - assertEquals(flow, Bytes.toString(rowKeyComponents[2])); - assertEquals(TimelineWriterUtils.invert(runid), - Bytes.toLong(rowKeyComponents[3])); - assertEquals(appName, Bytes.toString(rowKeyComponents[4])); + assertEquals(cluster, key.getClusterId()); + assertEquals(user, key.getUserId()); + assertEquals(flow, key.getFlowId()); + assertEquals(runid, key.getFlowRunId()); + assertEquals(appName, key.getAppId()); return true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java index f8331fa9b2f..d18613aed7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java @@ -45,7 +45,7 @@ class TestFlowDataGenerator { String type = TimelineEntityType.YARN_APPLICATION.toString(); entity.setId(id); entity.setType(type); - Long cTime = 1425016501000L; + long cTime = 1425016501000L; entity.setCreatedTime(cTime); // add metrics @@ -54,8 +54,8 @@ class TestFlowDataGenerator { m1.setId(metric1); Map metricValues = new HashMap(); long ts = System.currentTimeMillis(); - metricValues.put(ts - 100000, 2); - metricValues.put(ts - 80000, 40); + metricValues.put(ts - 100000, 2L); + metricValues.put(ts - 80000, 40L); m1.setType(Type.TIME_SERIES); m1.setValues(metricValues); metrics.add(m1); @@ -64,8 +64,8 @@ class TestFlowDataGenerator { m2.setId(metric2); metricValues = new HashMap(); ts = System.currentTimeMillis(); - metricValues.put(ts - 100000, 31); - metricValues.put(ts - 80000, 57); + metricValues.put(ts - 100000, 31L); + metricValues.put(ts - 80000, 57L); m2.setType(Type.TIME_SERIES); m2.setValues(metricValues); metrics.add(m2); @@ -80,7 +80,7 @@ class TestFlowDataGenerator { String type = TimelineEntityType.YARN_APPLICATION.toString(); entity.setId(id); entity.setType(type); - Long cTime = 1425016501000L; + long cTime = 1425016501000L; entity.setCreatedTime(cTime); // add metrics Set metrics = new HashSet<>(); @@ -103,8 +103,8 @@ class TestFlowDataGenerator { String type = TimelineEntityType.YARN_APPLICATION.toString(); entity.setId(id); entity.setType(type); - Long cTime = 20000000000000L; - Long mTime = 1425026901000L; + long cTime = 20000000000000L; + long mTime = 1425026901000L; entity.setCreatedTime(cTime); entity.setModifiedTime(mTime); // add metrics @@ -113,10 +113,10 @@ class TestFlowDataGenerator { m1.setId(metric1); Map metricValues = new HashMap(); long ts = System.currentTimeMillis(); - metricValues.put(ts - 120000, 100000000); - metricValues.put(ts - 100000, 200000000); - metricValues.put(ts - 80000, 300000000); - metricValues.put(ts - 60000, 400000000); + metricValues.put(ts - 120000, 100000000L); + metricValues.put(ts - 100000, 200000000L); + metricValues.put(ts - 80000, 300000000L); + metricValues.put(ts - 60000, 400000000L); metricValues.put(ts - 40000, 50000000000L); metricValues.put(ts - 20000, 60000000000L); m1.setType(Type.TIME_SERIES); @@ -126,7 +126,7 @@ class TestFlowDataGenerator { TimelineEvent event = new TimelineEvent(); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - Long expTs = 1436512802000L; + long expTs = 1436512802000L; event.setTimestamp(expTs); String expKey = "foo_event"; Object expVal = "test"; @@ -142,9 +142,9 @@ class TestFlowDataGenerator { return entity; } - static TimelineEntity getEntityGreaterStartTime() { + static TimelineEntity getEntityGreaterStartTime(long startTs) { TimelineEntity entity = new TimelineEntity(); - entity.setCreatedTime(30000000000000L); + entity.setCreatedTime(startTs); entity.setId("flowRunHello with greater start time"); String type = TimelineEntityType.YARN_APPLICATION.toString(); entity.setType(type); @@ -173,14 +173,13 @@ class TestFlowDataGenerator { return entity; } - static TimelineEntity getEntityMinStartTime() { + static TimelineEntity getEntityMinStartTime(long startTs) { TimelineEntity entity = new TimelineEntity(); String id = "flowRunHelloMInStartTime"; String type = TimelineEntityType.YARN_APPLICATION.toString(); entity.setId(id); entity.setType(type); - Long cTime = 10000000000000L; - entity.setCreatedTime(cTime); + entity.setCreatedTime(startTs); TimelineEvent event = new TimelineEvent(); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); event.setTimestamp(System.currentTimeMillis()); @@ -195,12 +194,12 @@ class TestFlowDataGenerator { String type = TimelineEntityType.YARN_APPLICATION.toString(); entity.setId(id); entity.setType(type); - Long cTime = 1425016501000L; + long cTime = 1425016501000L; entity.setCreatedTime(cTime); TimelineEvent event = new TimelineEvent(); event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); - Long expTs = 1436512802000L; + long expTs = 1436512802000L; event.setTimestamp(expTs); String expKey = "foo_event"; Object expVal = "test"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index b4a0c74db06..6bdec6b03ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -21,19 +21,16 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; +import java.util.NavigableSet; import java.util.Set; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; @@ -42,26 +39,17 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -119,11 +107,13 @@ public class TestHBaseStorageFlowActivity { String user = "testWriteFlowRunMinMaxToHBase_user1"; String flow = "testing_flowRun_flow_name"; String flowVersion = "CF7022C10F1354"; - Long runid = 1002345678919L; + long runid = 1002345678919L; String appName = "application_100000000000_1111"; + long minStartTs = 10000000000000L; + long greaterStartTs = 30000000000000L; long endTs = 1439750690000L; TimelineEntity entityMinStartTime = TestFlowDataGenerator - .getEntityMinStartTime(); + .getEntityMinStartTime(minStartTs); try { hbi = new HBaseTimelineWriterImpl(c1); @@ -146,7 +136,7 @@ public class TestHBaseStorageFlowActivity { // writer another entity with greater start time TimelineEntity entityGreaterStartTime = TestFlowDataGenerator - .getEntityGreaterStartTime(); + .getEntityGreaterStartTime(greaterStartTs); te = new TimelineEntities(); te.addEntity(entityGreaterStartTime); appName = "application_1000000000000000_2222"; @@ -181,6 +171,31 @@ public class TestHBaseStorageFlowActivity { assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); assertEquals(1, values.size()); checkFlowActivityRunId(runid, flowVersion, values); + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + // get the flow activity entity + Set entities = + hbr.getEntities(null, cluster, null, null, null, + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null, + null, null, null, null, null, null, null, null, null); + assertEquals(1, entities.size()); + for (TimelineEntity e : entities) { + FlowActivityEntity flowActivity = (FlowActivityEntity)e; + assertEquals(cluster, flowActivity.getCluster()); + assertEquals(user, flowActivity.getUser()); + assertEquals(flow, flowActivity.getFlowName()); + assertEquals(dayTs, flowActivity.getDate().getTime()); + Set flowRuns = flowActivity.getFlowRuns(); + assertEquals(1, flowRuns.size()); + } + } finally { + hbr.close(); + } } /** @@ -193,7 +208,7 @@ public class TestHBaseStorageFlowActivity { String user = "testWriteFlowActivityOneFlow_user1"; String flow = "flow_activity_test_flow_name"; String flowVersion = "A122110F135BC4"; - Long runid = 1001111178919L; + long runid = 1001111178919L; TimelineEntities te = new TimelineEntities(); TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1(); @@ -212,10 +227,35 @@ public class TestHBaseStorageFlowActivity { } // check flow activity checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1); + + // use the reader to verify the data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + + Set entities = + hbr.getEntities(user, cluster, flow, null, null, + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null, + null, null, null, null, null, null, null, null, null); + assertEquals(1, entities.size()); + for (TimelineEntity e : entities) { + FlowActivityEntity entity = (FlowActivityEntity)e; + NavigableSet flowRuns = entity.getFlowRuns(); + assertEquals(1, flowRuns.size()); + for (FlowRunEntity flowRun : flowRuns) { + assertEquals(runid, flowRun.getRunId()); + assertEquals(flowVersion, flowRun.getVersion()); + } + } + } finally { + hbr.close(); + } } private void checkFlowActivityTable(String cluster, String user, String flow, - String flowVersion, Long runid, Configuration c1) throws IOException { + String flowVersion, long runid, Configuration c1) throws IOException { Scan s = new Scan(); s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); @@ -263,7 +303,7 @@ public class TestHBaseStorageFlowActivity { String user = "testManyRunsFlowActivity_c_user1"; String flow = "flow_activity_test_flow_name"; String flowVersion1 = "A122110F135BC4"; - Long runid1 = 11111111111L; + long runid1 = 11111111111L; String flowVersion2 = "A12222222222C4"; long runid2 = 2222222222222L; @@ -303,11 +343,50 @@ public class TestHBaseStorageFlowActivity { checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1, runid1, flowVersion2, runid2, flowVersion3, runid3); + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + + Set entities = + hbr.getEntities(null, cluster, null, null, null, + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null, + null, null, null, null, null, null, null, null, null); + assertEquals(1, entities.size()); + for (TimelineEntity e : entities) { + FlowActivityEntity flowActivity = (FlowActivityEntity)e; + assertEquals(cluster, flowActivity.getCluster()); + assertEquals(user, flowActivity.getUser()); + assertEquals(flow, flowActivity.getFlowName()); + long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System + .currentTimeMillis()); + assertEquals(dayTs, flowActivity.getDate().getTime()); + Set flowRuns = flowActivity.getFlowRuns(); + assertEquals(3, flowRuns.size()); + for (FlowRunEntity flowRun : flowRuns) { + long runId = flowRun.getRunId(); + String version = flowRun.getVersion(); + if (runId == runid1) { + assertEquals(flowVersion1, version); + } else if (runId == runid2) { + assertEquals(flowVersion2, version); + } else if (runId == runid3) { + assertEquals(flowVersion3, version); + } else { + fail("unknown run id: " + runId); + } + } + } + } finally { + hbr.close(); + } } private void checkFlowActivityTableSeveralRuns(String cluster, String user, - String flow, Configuration c1, String flowVersion1, Long runid1, - String flowVersion2, Long runid2, String flowVersion3, Long runid3) + String flow, Configuration c1, String flowVersion1, long runid1, + String flowVersion2, long runid2, String flowVersion3, long runid3) throws IOException { Scan s = new Scan(); s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); @@ -351,7 +430,7 @@ public class TestHBaseStorageFlowActivity { assertEquals(1, rowCount); } - private void checkFlowActivityRunId(Long runid, String flowVersion, + private void checkFlowActivityRunId(long runid, String flowVersion, Map values) throws IOException { byte[] rq = ColumnHelper.getColumnQualifier( FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index bf524ea56bd..b0f83b7a0c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -21,20 +21,15 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; @@ -42,32 +37,16 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -125,11 +104,13 @@ public class TestHBaseStorageFlowRun { String user = "testWriteFlowRunMinMaxToHBase_user1"; String flow = "testing_flowRun_flow_name"; String flowVersion = "CF7022C10F1354"; - Long runid = 1002345678919L; + long runid = 1002345678919L; String appName = "application_100000000000_1111"; + long minStartTs = 10000000000000L; + long greaterStartTs = 30000000000000L; long endTs = 1439750690000L; TimelineEntity entityMinStartTime = TestFlowDataGenerator - .getEntityMinStartTime(); + .getEntityMinStartTime(minStartTs); try { hbi = new HBaseTimelineWriterImpl(c1); @@ -152,7 +133,7 @@ public class TestHBaseStorageFlowRun { // writer another entity with greater start time TimelineEntity entityGreaterStartTime = TestFlowDataGenerator - .getEntityGreaterStartTime(); + .getEntityGreaterStartTime(greaterStartTs); te = new TimelineEntities(); te.addEntity(entityGreaterStartTime); appName = "application_1000000000000000_2222"; @@ -183,24 +164,29 @@ public class TestHBaseStorageFlowRun { .getBytes()); assertEquals(2, r1.size()); - Long starttime = (Long) GenericObjectMapper.read(values + long starttime = (Long) GenericObjectMapper.read(values .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); - Long expmin = entityMinStartTime.getCreatedTime(); - assertEquals(expmin, starttime); + assertEquals(minStartTs, starttime); assertEquals(endTs, GenericObjectMapper.read(values .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); - } - boolean isFlowRunRowKeyCorrect(byte[] rowKey, String cluster, String user, - String flow, Long runid) { - byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1); - assertTrue(rowKeyComponents.length == 4); - assertEquals(cluster, Bytes.toString(rowKeyComponents[0])); - assertEquals(user, Bytes.toString(rowKeyComponents[1])); - assertEquals(flow, Bytes.toString(rowKeyComponents[2])); - assertEquals(TimelineWriterUtils.invert(runid), - Bytes.toLong(rowKeyComponents[3])); - return true; + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + // get the flow run entity + TimelineEntity entity = + hbr.getEntity(user, cluster, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null); + assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); + FlowRunEntity flowRun = (FlowRunEntity)entity; + assertEquals(minStartTs, flowRun.getStartTime()); + assertEquals(endTs, flowRun.getMaxEndTime()); + } finally { + hbr.close(); + } } /** @@ -218,7 +204,7 @@ public class TestHBaseStorageFlowRun { String user = "testWriteFlowRunMetricsOneFlow_user1"; String flow = "testing_flowRun_metrics_flow_name"; String flowVersion = "CF7022C10F1354"; - Long runid = 1002345678919L; + long runid = 1002345678919L; TimelineEntities te = new TimelineEntities(); TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); @@ -244,6 +230,41 @@ public class TestHBaseStorageFlowRun { // check flow run checkFlowRunTable(cluster, user, flow, runid, c1); + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + TimelineEntity entity = + hbr.getEntity(user, cluster, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null); + assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); + Set metrics = entity.getMetrics(); + assertEquals(2, metrics.size()); + for (TimelineMetric metric : metrics) { + String id = metric.getId(); + Map values = metric.getValues(); + assertEquals(1, values.size()); + Number value = null; + for (Number n : values.values()) { + value = n; + } + switch (id) { + case metric1: + assertEquals(141, value); + break; + case metric2: + assertEquals(57, value); + break; + default: + fail("unrecognized metric: " + id); + } + } + } finally { + hbr.close(); + } } private void checkFlowRunTable(String cluster, String user, String flow,