From 1f710484e5b8ab4d5c67379c012004e8a4242d15 Mon Sep 17 00:00:00 2001 From: Sangjin Lee Date: Tue, 1 Dec 2015 21:47:43 -0800 Subject: [PATCH] YARN-3862. Support for fetching specific configs and metrics based on prefixes (Varun Saxena via sjlee) --- .../reader/TimelineReaderManager.java | 4 +- .../reader/filter/TimelineCompareFilter.java | 61 ++ .../reader/filter/TimelineCompareOp.java | 36 + .../reader/filter/TimelineFilter.java | 56 ++ .../reader/filter/TimelineFilterList.java | 91 +++ .../reader/filter/TimelineFilterUtils.java | 120 +++ .../reader/filter/TimelinePrefixFilter.java | 56 ++ .../reader/filter/package-info.java | 28 + .../storage/ApplicationEntityReader.java | 123 +++- .../storage/FileSystemTimelineReaderImpl.java | 9 +- .../storage/FlowActivityEntityReader.java | 16 +- .../storage/FlowRunEntityReader.java | 69 +- .../storage/GenericEntityReader.java | 119 ++- .../storage/HBaseTimelineReaderImpl.java | 11 +- .../storage/TimelineEntityReader.java | 32 +- .../storage/TimelineEntityReaderFactory.java | 23 +- .../storage/TimelineReader.java | 32 + .../application/ApplicationColumnPrefix.java | 18 +- .../storage/common/ColumnPrefix.java | 29 +- .../storage/entity/EntityColumnPrefix.java | 18 +- .../flow/FlowActivityColumnPrefix.java | 18 +- .../storage/flow/FlowRunColumnPrefix.java | 18 +- .../TestFileSystemTimelineReaderImpl.java | 42 +- .../storage/TestHBaseTimelineStorage.java | 682 ++++++++++++++++-- .../flow/TestHBaseStorageFlowActivity.java | 6 +- .../storage/flow/TestHBaseStorageFlowRun.java | 190 ++++- 26 files changed, 1758 insertions(+), 149 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java index 27a50d528e7..294b05b5a3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java @@ -77,7 +77,7 @@ public class TimelineReaderManager extends AbstractService { return reader.getEntities(userId, cluster, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, - metricFilters, eventFilters, fieldsToRetrieve); + metricFilters, eventFilters, null, null, fieldsToRetrieve); } /** @@ -91,6 +91,6 @@ public class TimelineReaderManager extends AbstractService { String entityId, EnumSet fields) throws IOException { String cluster = getClusterID(clusterId, getConfig()); return reader.getEntity(userId, cluster, flowId, flowRunId, appId, - entityType, entityId, fields); + entityType, entityId, null, null, fields); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java new file mode 100644 index 00000000000..14e71246342 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Filter class which represents filter to be applied based on key-value pair + * and the relation between them represented by different relational operators. + */ +@Private +@Unstable +public class TimelineCompareFilter extends TimelineFilter { + + private TimelineCompareOp compareOp; + private String key; + private Object value; + + public TimelineCompareFilter() { + } + + public TimelineCompareFilter(TimelineCompareOp op, String key, Object val) { + this.compareOp = op; + this.key = key; + this.value = val; + } + + @Override + public TimelineFilterType getFilterType() { + return TimelineFilterType.COMPARE; + } + + public TimelineCompareOp getCompareOp() { + return compareOp; + } + + public String getKey() { + return key; + } + + public Object getValue() { + return value; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java new file mode 100644 index 00000000000..461a7d8571e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Comparison Operators. + */ +@Private +@Unstable +public enum TimelineCompareOp { + LESS_THAN, + LESS_OR_EQUAL, + EQUAL, + NOT_EQUAL, + GREATER_OR_EQUAL, + GREATER_THAN +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java new file mode 100644 index 00000000000..d4b4045449c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Abstract base class extended to implement timeline filters. + */ +@Private +@Unstable +public abstract class TimelineFilter { + + /** + * Lists the different filter types. + */ + @Private + @Unstable + public enum TimelineFilterType { + /** + * Combines multiple filters. + */ + LIST, + /** + * Filter which is used for comparison. + */ + COMPARE, + /** + * Filter which matches prefix for a config or a metric. + */ + PREFIX + } + + public abstract TimelineFilterType getFilterType(); + + public String toString() { + return this.getClass().getSimpleName(); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java new file mode 100644 index 00000000000..8727bd79c15 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Implementation of {@link TimelineFilter} that represents an ordered list of + * timeline filters which will then be evaluated with a specified boolean + * operator {@link Operator#AND} or {@link Operator#OR}. Since you can use + * timeline filter lists as children of timeline filter lists, you can create a + * hierarchy of filters to be evaluated. + */ +@Private +@Unstable +public class TimelineFilterList extends TimelineFilter { + /** + * Specifies how filters in the filter list will be evaluated. AND means all + * the filters should match and OR means atleast one should match. + */ + @Private + @Unstable + public static enum Operator { + AND, + OR + } + + private Operator operator; + private List filterList = new ArrayList(); + + public TimelineFilterList(TimelineFilter...filters) { + this(Operator.AND, filters); + } + + public TimelineFilterList(Operator op, TimelineFilter...filters) { + this.operator = op; + this.filterList = new ArrayList(Arrays.asList(filters)); + } + + @Override + public TimelineFilterType getFilterType() { + return TimelineFilterType.LIST; + } + + /** + * Get the filter list. + * + * @return filterList + */ + public List getFilterList() { + return filterList; + } + + /** + * Get the operator. + * + * @return operator + */ + public Operator getOperator() { + return operator; + } + + public void setOperator(Operator op) { + operator = op; + } + + public void addFilter(TimelineFilter filter) { + filterList.add(filter); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java new file mode 100644 index 00000000000..da3c3836b0b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.hbase.filter.QualifierFilter; + +/** + * Set of utility methods used by timeline filter classes. + */ +public final class TimelineFilterUtils { + + private TimelineFilterUtils() { + } + + /** + * Returns the equivalent HBase filter list's {@link Operator}. + * @param op + * @return HBase filter list's Operator. + */ + private static Operator getHBaseOperator(TimelineFilterList.Operator op) { + switch (op) { + case AND: + return Operator.MUST_PASS_ALL; + case OR: + return Operator.MUST_PASS_ONE; + default: + throw new IllegalArgumentException("Invalid operator"); + } + } + + /** + * Returns the equivalent HBase compare filter's {@link CompareOp}. + * @param op + * @return HBase compare filter's CompareOp. + */ + private static CompareOp getHBaseCompareOp( + TimelineCompareOp op) { + switch (op) { + case LESS_THAN: + return CompareOp.LESS; + case LESS_OR_EQUAL: + return CompareOp.LESS_OR_EQUAL; + case EQUAL: + return CompareOp.EQUAL; + case NOT_EQUAL: + return CompareOp.NOT_EQUAL; + case GREATER_OR_EQUAL: + return CompareOp.GREATER_OR_EQUAL; + case GREATER_THAN: + return CompareOp.GREATER; + default: + throw new IllegalArgumentException("Invalid compare operator"); + } + } + + /** + * Converts a {@link TimelinePrefixFilter} to an equivalent HBase + * {@link QualifierFilter}. + * @param colPrefix + * @param filter + * @return a {@link QualifierFilter} object + */ + private static Filter createHBaseColQualPrefixFilter( + ColumnPrefix colPrefix, TimelinePrefixFilter filter) { + return new QualifierFilter(getHBaseCompareOp(filter.getCompareOp()), + new BinaryPrefixComparator( + colPrefix.getColumnPrefixBytes(filter.getPrefix()))); + } + + /** + * Creates equivalent HBase {@link FilterList} from {@link TimelineFilterList} + * while converting different timeline filters(of type {@link TimelineFilter}) + * into their equivalent HBase filters. + * @param colPrefix + * @param filterList + * @return a {@link FilterList} object + */ + public static FilterList createHBaseFilterList(ColumnPrefix colPrefix, + TimelineFilterList filterList) { + FilterList list = + new FilterList(getHBaseOperator(filterList.getOperator())); + for (TimelineFilter filter : filterList.getFilterList()) { + switch(filter.getFilterType()) { + case LIST: + list.addFilter( + createHBaseFilterList(colPrefix, (TimelineFilterList)filter)); + break; + case PREFIX: + list.addFilter(createHBaseColQualPrefixFilter( + colPrefix, (TimelinePrefixFilter)filter)); + break; + default: + break; + } + } + return list; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java new file mode 100644 index 00000000000..6233f264692 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Filter class which represents filter to be applied based on prefixes. + * Prefixes can either match or not match. + */ +@Private +@Unstable +public class TimelinePrefixFilter extends TimelineFilter { + + private TimelineCompareOp compareOp; + private String prefix; + + public TimelinePrefixFilter(TimelineCompareOp op, String prefix) { + this.prefix = prefix; + if (op != TimelineCompareOp.EQUAL && op != TimelineCompareOp.NOT_EQUAL) { + throw new IllegalArgumentException("CompareOp for prefix filter should " + + "be EQUAL or NOT_EQUAL"); + } + this.compareOp = op; + } + + @Override + public TimelineFilterType getFilterType() { + return TimelineFilterType.PREFIX; + } + + public String getPrefix() { + return prefix; + } + + public TimelineCompareOp getCompareOp() { + return compareOp; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java new file mode 100644 index 00000000000..f7c07057d7c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.server.timelineservice.reader.filter stores + * timeline filter implementations. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java index 8324afd92a8..7082a5e36b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java @@ -28,11 +28,21 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; @@ -56,18 +66,21 @@ class ApplicationEntityReader extends GenericEntityReader { Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, true); + eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve, + true); } public ApplicationEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, - fieldsToRetrieve); + confsToRetrieve, metricsToRetrieve, fieldsToRetrieve); } /** @@ -78,13 +91,95 @@ class ApplicationEntityReader extends GenericEntityReader { } @Override - protected Result getResult(Configuration hbaseConf, Connection conn) - throws IOException { + protected FilterList constructFilterListBasedOnFields() { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + // Fetch all the columns. + if (fieldsToRetrieve.contains(Field.ALL) && + (confsToRetrieve == null || + confsToRetrieve.getFilterList().isEmpty()) && + (metricsToRetrieve == null || + metricsToRetrieve.getFilterList().isEmpty())) { + return list; + } + FilterList infoColFamilyList = new FilterList(); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.INFO.getBytes())); + infoColFamilyList.addFilter(infoColumnFamily); + // Events not required. + if (!fieldsToRetrieve.contains(Field.EVENTS) && + !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.EVENT.getColumnPrefixBytes("")))); + } + // info not required. + if (!fieldsToRetrieve.contains(Field.INFO) && + !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.INFO.getColumnPrefixBytes("")))); + } + // is releated to not required. + if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) && + !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); + } + // relates to not required. + if (!fieldsToRetrieve.contains(Field.RELATES_TO) && + !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); + } + list.addFilter(infoColFamilyList); + if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) || + (confsToRetrieve != null && + !confsToRetrieve.getFilterList().isEmpty())) { + FilterList filterCfg = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes()))); + if (confsToRetrieve != null && + !confsToRetrieve.getFilterList().isEmpty()) { + filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.CONFIG, confsToRetrieve)); + } + list.addFilter(filterCfg); + } + if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) || + (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty())) { + FilterList filterMetrics = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes()))); + if (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.METRIC, metricsToRetrieve)); + } + list.addFilter(filterMetrics); + } + return list; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } return table.getResult(hbaseConf, conn, get); } @@ -115,6 +210,15 @@ class ApplicationEntityReader extends GenericEntityReader { if (fieldsToRetrieve == null) { fieldsToRetrieve = EnumSet.noneOf(Field.class); } + if (!fieldsToRetrieve.contains(Field.CONFIGS) && + confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.CONFIGS); + } + if (!fieldsToRetrieve.contains(Field.METRICS) && + metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.METRICS); + } if (!singleEntityRead) { if (limit == null || limit < 0) { limit = TimelineReader.DEFAULT_LIMIT; @@ -136,7 +240,7 @@ class ApplicationEntityReader extends GenericEntityReader { @Override protected ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException { + Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); if (flowRunId != null) { scan.setRowPrefixFilter(ApplicationRowKey. @@ -145,7 +249,12 @@ class ApplicationEntityReader extends GenericEntityReader { scan.setRowPrefixFilter(ApplicationRowKey. getRowKeyPrefix(clusterId, userId, flowId)); } - scan.setFilter(new PageFilter(limit)); + FilterList newList = new FilterList(); + newList.addFilter(new PageFilter(limit)); + if (filterList != null && !filterList.getFilters().isEmpty()) { + newList.addFilter(filterList); + } + scan.setFilter(newList); return table.getResultScanner(hbaseConf, conn, scan); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java index 30d1d00740e..48bf844560b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.codehaus.jackson.JsonGenerationException; @@ -272,6 +273,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) throws IOException { if (limit == null || limit <= 0) { limit = DEFAULT_LIMIT; @@ -386,7 +388,9 @@ public class FileSystemTimelineReaderImpl extends AbstractService @Override public TimelineEntity getEntity(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) throws IOException { + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) + throws IOException { String flowRunPath = getFlowRunPath(userId, clusterId, flowId, flowRunId, appId); File dir = new File(new File(rootPath, ENTITIES_DIR), @@ -413,6 +417,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) throws IOException { String flowRunPath = getFlowRunPath(userId, clusterId, flowId, flowRunId, appId); @@ -422,6 +427,6 @@ public class FileSystemTimelineReaderImpl extends AbstractService return getEntities(dir, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve); + eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java index 3e32128c45b..71dd0a1fe96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; @@ -58,14 +59,14 @@ class FlowActivityEntityReader extends TimelineEntityReader { super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, true); + eventFilters, null, null, fieldsToRetrieve, true); } public FlowActivityEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, String entityId, EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, - fieldsToRetrieve); + null, null, fieldsToRetrieve); } /** @@ -96,15 +97,20 @@ class FlowActivityEntityReader extends TimelineEntityReader { } @Override - protected Result getResult(Configuration hbaseConf, Connection conn) - throws IOException { + protected FilterList constructFilterListBasedOnFields() { + return null; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { throw new UnsupportedOperationException( "we don't support a single entity query"); } @Override protected ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException { + Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); if (createdTimeBegin == DEFAULT_BEGIN_TIME && createdTimeEnd == DEFAULT_END_TIME) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java index ebf2d277f76..1895fa6b996 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java @@ -28,12 +28,22 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; @@ -54,18 +64,20 @@ class FlowRunEntityReader extends TimelineEntityReader { Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, true); + eventFilters, null, metricsToRetrieve, fieldsToRetrieve, true); } public FlowRunEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, - fieldsToRetrieve); + null, metricsToRetrieve, fieldsToRetrieve); } /** @@ -101,26 +113,69 @@ class FlowRunEntityReader extends TimelineEntityReader { if (createdTimeEnd == null) { createdTimeEnd = DEFAULT_END_TIME; } + if (!fieldsToRetrieve.contains(Field.METRICS) && + metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.METRICS); + } } } @Override - protected Result getResult(Configuration hbaseConf, Connection conn) - throws IOException { + protected FilterList constructFilterListBasedOnFields() { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(FlowRunColumnFamily.INFO.getBytes())); + // Metrics not required. + if (!singleEntityRead && !fieldsToRetrieve.contains(Field.METRICS) && + !fieldsToRetrieve.contains(Field.ALL)) { + FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); + infoColFamilyList.addFilter(infoColumnFamily); + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes("")))); + list.addFilter(infoColFamilyList); + } + if (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + FilterList infoColFamilyList = new FilterList(); + infoColFamilyList.addFilter(infoColumnFamily); + infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList( + FlowRunColumnPrefix.METRIC, metricsToRetrieve)); + list.addFilter(infoColFamilyList); + } + return list; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowId, flowRunId); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } return table.getResult(hbaseConf, conn, get); } @Override protected ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException { + Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); scan.setRowPrefixFilter( FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowId)); - scan.setFilter(new PageFilter(limit)); + FilterList newList = new FilterList(); + newList.addFilter(new PageFilter(limit)); + if (filterList != null && !filterList.getFilters().isEmpty()) { + newList.addFilter(filterList); + } + scan.setFilter(newList); return table.getResultScanner(hbaseConf, conn, scan); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java index 04fc8ee6f36..dcb8b89c04c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java @@ -32,9 +32,18 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; @@ -46,6 +55,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; @@ -72,18 +82,21 @@ class GenericEntityReader extends TimelineEntityReader { Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve, boolean sortedKeys) { super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, sortedKeys); + eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve, + sortedKeys); } public GenericEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, - fieldsToRetrieve); + confsToRetrieve, metricsToRetrieve, fieldsToRetrieve); } /** @@ -93,6 +106,85 @@ class GenericEntityReader extends TimelineEntityReader { return ENTITY_TABLE; } + @Override + protected FilterList constructFilterListBasedOnFields() { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + // Fetch all the columns. + if (fieldsToRetrieve.contains(Field.ALL) && + (confsToRetrieve == null || + confsToRetrieve.getFilterList().isEmpty()) && + (metricsToRetrieve == null || + metricsToRetrieve.getFilterList().isEmpty())) { + return list; + } + FilterList infoColFamilyList = new FilterList(); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(EntityColumnFamily.INFO.getBytes())); + infoColFamilyList.addFilter(infoColumnFamily); + // Events not required. + if (!fieldsToRetrieve.contains(Field.EVENTS) && + !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.EVENT.getColumnPrefixBytes("")))); + } + // info not required. + if (!fieldsToRetrieve.contains(Field.INFO) && + !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.INFO.getColumnPrefixBytes("")))); + } + // is related to not required. + if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) && + !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); + } + // relates to not required. + if (!fieldsToRetrieve.contains(Field.RELATES_TO) && + !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); + } + list.addFilter(infoColFamilyList); + if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) || + (confsToRetrieve != null && + !confsToRetrieve.getFilterList().isEmpty())) { + FilterList filterCfg = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes()))); + if (confsToRetrieve != null && + !confsToRetrieve.getFilterList().isEmpty()) { + filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.CONFIG, confsToRetrieve)); + } + list.addFilter(filterCfg); + } + if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) || + (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty())) { + FilterList filterMetrics = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(EntityColumnFamily.METRICS.getBytes()))); + if (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.METRIC, metricsToRetrieve)); + } + list.addFilter(filterMetrics); + } + return list; + } + protected FlowContext lookupFlowContext(String clusterId, String appId, Configuration hbaseConf, Connection conn) throws IOException { byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); @@ -145,6 +237,15 @@ class GenericEntityReader extends TimelineEntityReader { if (fieldsToRetrieve == null) { fieldsToRetrieve = EnumSet.noneOf(Field.class); } + if (!fieldsToRetrieve.contains(Field.CONFIGS) && + confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.CONFIGS); + } + if (!fieldsToRetrieve.contains(Field.METRICS) && + metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.METRICS); + } if (!singleEntityRead) { if (limit == null || limit < 0) { limit = TimelineReader.DEFAULT_LIMIT; @@ -165,25 +266,31 @@ class GenericEntityReader extends TimelineEntityReader { } @Override - protected Result getResult(Configuration hbaseConf, Connection conn) - throws IOException { + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { byte[] rowKey = EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId, entityType, entityId); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } return table.getResult(hbaseConf, conn, get); } @Override protected ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException { + Connection conn, FilterList filterList) throws IOException { // Scan through part of the table to find the entities belong to one app // and one type Scan scan = new Scan(); scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( clusterId, userId, flowId, flowRunId, appId, entityType)); scan.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + scan.setFilter(filterList); + } return table.getResultScanner(hbaseConf, conn, scan); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index 889ae191612..9e4b26a08cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; public class HBaseTimelineReaderImpl extends AbstractService implements TimelineReader { @@ -64,11 +65,13 @@ public class HBaseTimelineReaderImpl @Override public TimelineEntity getEntity(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) throws IOException { TimelineEntityReader reader = TimelineEntityReaderFactory.createSingleEntityReader(userId, clusterId, - flowId, flowRunId, appId, entityType, entityId, fieldsToRetrieve); + flowId, flowRunId, appId, entityType, entityId, confsToRetrieve, + metricsToRetrieve, fieldsToRetrieve); return reader.readEntity(hbaseConf, conn); } @@ -80,13 +83,15 @@ public class HBaseTimelineReaderImpl Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) throws IOException { TimelineEntityReader reader = TimelineEntityReaderFactory.createMultipleEntitiesReader(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, - metricFilters, eventFilters, fieldsToRetrieve); + metricFilters, eventFilters, confsToRetrieve, metricsToRetrieve, + fieldsToRetrieve); return reader.readEntities(hbaseConf, conn); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java index adaf42ed666..7178aab4093 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java @@ -31,8 +31,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; @@ -70,6 +72,8 @@ abstract class TimelineEntityReader { protected Map configFilters; protected Set metricFilters; protected Set eventFilters; + protected TimelineFilterList confsToRetrieve; + protected TimelineFilterList metricsToRetrieve; /** * Main table the entity reader uses. @@ -94,6 +98,7 @@ abstract class TimelineEntityReader { Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve, boolean sortedKeys) { this.singleEntityRead = false; this.sortedKeys = sortedKeys; @@ -115,6 +120,8 @@ abstract class TimelineEntityReader { this.configFilters = configFilters; this.metricFilters = metricFilters; this.eventFilters = eventFilters; + this.confsToRetrieve = confsToRetrieve; + this.metricsToRetrieve = metricsToRetrieve; this.table = getTable(); } @@ -124,7 +131,8 @@ abstract class TimelineEntityReader { */ protected TimelineEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { this.singleEntityRead = true; this.userId = userId; this.clusterId = clusterId; @@ -134,10 +142,20 @@ abstract class TimelineEntityReader { this.entityType = entityType; this.fieldsToRetrieve = fieldsToRetrieve; this.entityId = entityId; + this.confsToRetrieve = confsToRetrieve; + this.metricsToRetrieve = metricsToRetrieve; this.table = getTable(); } + /** + * Creates a {@link FilterList} based on fields, confs and metrics to + * retrieve. This filter list will be set in Scan/Get objects to trim down + * results fetched from HBase back-end storage. + * @return a {@link FilterList} object. + */ + protected abstract FilterList constructFilterListBasedOnFields(); + /** * Reads and deserializes a single timeline entity from the HBase storage. */ @@ -146,7 +164,8 @@ abstract class TimelineEntityReader { validateParams(); augmentParams(hbaseConf, conn); - Result result = getResult(hbaseConf, conn); + FilterList filterList = constructFilterListBasedOnFields(); + Result result = getResult(hbaseConf, conn, filterList); if (result == null || result.isEmpty()) { // Could not find a matching row. LOG.info("Cannot find matching entity of type " + entityType); @@ -166,7 +185,8 @@ abstract class TimelineEntityReader { augmentParams(hbaseConf, conn); NavigableSet entities = new TreeSet<>(); - ResultScanner results = getResults(hbaseConf, conn); + FilterList filterList = constructFilterListBasedOnFields(); + ResultScanner results = getResults(hbaseConf, conn, filterList); try { for (Result result : results) { TimelineEntity entity = parseEntity(result); @@ -211,14 +231,14 @@ abstract class TimelineEntityReader { * * @return the {@link Result} instance or null if no such record is found. */ - protected abstract Result getResult(Configuration hbaseConf, Connection conn) - throws IOException; + protected abstract Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException; /** * Fetches a {@link ResultScanner} for a multi-entity read. */ protected abstract ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException; + Connection conn, FilterList filterList) throws IOException; /** * Given a {@link Result} instance, deserializes and creates a diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java index f5341c2e244..16204c59442 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; /** @@ -34,22 +35,23 @@ class TimelineEntityReaderFactory { */ public static TimelineEntityReader createSingleEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, - String entityType, String entityId, EnumSet fieldsToRetrieve) { + String entityType, String entityId, TimelineFilterList confs, + TimelineFilterList metrics, EnumSet fieldsToRetrieve) { // currently the types that are handled separate from the generic entity // table are application, flow run, and flow activity entities if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, entityId, fieldsToRetrieve); + appId, entityType, entityId, confs, metrics, fieldsToRetrieve); } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, entityId, fieldsToRetrieve); + appId, entityType, entityId, confs, metrics, fieldsToRetrieve); } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, fieldsToRetrieve); } else { // assume we're dealing with a generic entity read return new GenericEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, entityId, fieldsToRetrieve); + appId, entityType, entityId, confs, metrics, fieldsToRetrieve); } } @@ -64,6 +66,7 @@ class TimelineEntityReaderFactory { Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confs, TimelineFilterList metrics, EnumSet fieldsToRetrieve) { // currently the types that are handled separate from the generic entity // table are application, flow run, and flow activity entities @@ -71,8 +74,8 @@ class TimelineEntityReaderFactory { return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve); + infoFilters, configFilters, metricFilters, eventFilters, confs, + metrics, fieldsToRetrieve); } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, @@ -83,15 +86,15 @@ class TimelineEntityReaderFactory { return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve); + infoFilters, configFilters, metricFilters, eventFilters, confs, + metrics, fieldsToRetrieve); } else { // assume we're dealing with a generic entity read return new GenericEntityReader(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve, false); + infoFilters, configFilters, metricFilters, eventFilters, confs, + metrics, fieldsToRetrieve, false); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java index e4e305e84d1..0ed17da4188 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java @@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; /** ATSv2 reader interface. */ @Private @@ -70,6 +72,18 @@ public interface TimelineReader extends Service { * Entity type (mandatory) * @param entityId * Entity Id (mandatory) + * @param confsToRetrieve + * Used for deciding which configs to return in response. This is + * represented as a {@link TimelineFilterList} object containing + * {@link TimelinePrefixFilter} objects. These can either be exact config + * keys' or prefixes which are then compared against config keys' to decide + * configs to return in response. + * @param metricsToRetrieve + * Used for deciding which metrics to return in response. This is + * represented as a {@link TimelineFilterList} object containing + * {@link TimelinePrefixFilter} objects. These can either be exact metric + * ids' or prefixes which are then compared against metric ids' to decide + * metrics to return in response. * @param fieldsToRetrieve * Specifies which fields of the entity object to retrieve(optional), see * {@link Field}. If null, retrieves 4 fields namely entity id, @@ -81,6 +95,7 @@ public interface TimelineReader extends Service { */ TimelineEntity getEntity(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, String entityId, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) throws IOException; /** @@ -139,6 +154,22 @@ public interface TimelineReader extends Service { * @param eventFilters * Matched entities should contain the given events (optional). If null * or empty, the filter is not applied. + * @param confsToRetrieve + * Used for deciding which configs to return in response. This is + * represented as a {@link TimelineFilterList} object containing + * {@link TimelinePrefixFilter} objects. These can either be exact config + * keys' or prefixes which are then compared against config keys' to decide + * configs(inside entities) to return in response. This should not be + * confused with configFilters which is used to decide which entities to + * return instead. + * @param metricsToRetrieve + * Used for deciding which metrics to return in response. This is + * represented as a {@link TimelineFilterList} object containing + * {@link TimelinePrefixFilter} objects. These can either be exact metric + * ids' or prefixes which are then compared against metric ids' to decide + * metrics(inside entities) to return in response. This should not be + * confused with metricFilters which is used to decide which entities to + * return instead. * @param fieldsToRetrieve * Specifies which fields of the entity object to retrieve(optional), see * {@link Field}. If null, retrieves 4 fields namely entity id, @@ -158,5 +189,6 @@ public interface TimelineReader extends Service { Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) throws IOException; } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java index b06f5c157ae..056e51fa6b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -119,6 +119,18 @@ public enum ApplicationColumnPrefix implements ColumnPrefix { return columnPrefix; } + @Override + public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + + @Override + public byte[] getColumnPrefixBytes(String qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + /* * (non-Javadoc) * @@ -139,8 +151,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix { + tableMutator.getName().getNameAsString()); } - byte[] columnQualifier = - ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + byte[] columnQualifier = getColumnPrefixBytes(qualifier); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, attributes); @@ -166,8 +177,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix { + tableMutator.getName().getNameAsString()); } - byte[] columnQualifier = - ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + byte[] columnQualifier = getColumnPrefixBytes(qualifier); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, attributes); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java index db4909825d9..0f3ac4e8d90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java @@ -44,13 +44,13 @@ public interface ColumnPrefix { * @param qualifier column qualifier. Nothing gets written when null. * @param timestamp version timestamp. When null the server timestamp will be * used. - *@param attributes attributes for the mutation that are used by the coprocessor - * to set/read the cell tags + * @param attributes attributes for the mutation that are used by the + * coprocessor to set/read the cell tags. * @param inputValue the value to write to the rowKey and column qualifier. * Nothing gets written when null. * @throws IOException */ - public void store(byte[] rowKey, TypedBufferedMutator tableMutator, + void store(byte[] rowKey, TypedBufferedMutator tableMutator, byte[] qualifier, Long timestamp, Object inputValue, Attribute... attributes) throws IOException; @@ -65,13 +65,13 @@ public interface ColumnPrefix { * @param qualifier column qualifier. Nothing gets written when null. * @param timestamp version timestamp. When null the server timestamp will be * used. - *@param attributes attributes for the mutation that are used by the coprocessor - * to set/read the cell tags + * @param attributes attributes for the mutation that are used by the + * coprocessor to set/read the cell tags. * @param inputValue the value to write to the rowKey and column qualifier. * Nothing gets written when null. * @throws IOException */ - public void store(byte[] rowKey, TypedBufferedMutator tableMutator, + void store(byte[] rowKey, TypedBufferedMutator tableMutator, String qualifier, Long timestamp, Object inputValue, Attribute... attributes) throws IOException; @@ -86,7 +86,7 @@ public interface ColumnPrefix { * in the result. * @throws IOException */ - public Object readResult(Result result, String qualifier) throws IOException; + Object readResult(Result result, String qualifier) throws IOException; /** * @param result from which to read columns @@ -94,7 +94,7 @@ public interface ColumnPrefix { * (or all of them if the prefix value is null). * @throws IOException */ - public Map readResults(Result result) throws IOException; + Map readResults(Result result) throws IOException; /** * @param result from which to reads data with timestamps @@ -104,7 +104,18 @@ public interface ColumnPrefix { * idB={timestamp3->value3}, idC={timestamp1->value4}} * @throws IOException */ - public NavigableMap> + NavigableMap> readResultsWithTimestamps(Result result) throws IOException; + /** + * @param qualifierPrefix Column qualifier or prefix of qualifier. + * @return a byte array encoding column prefix and qualifier/prefix passed. + */ + byte[] getColumnPrefixBytes(String qualifierPrefix); + + /** + * @param qualifierPrefix Column qualifier or prefix of qualifier. + * @return a byte array encoding column prefix and qualifier/prefix passed. + */ + byte[] getColumnPrefixBytes(byte[] qualifierPrefix); } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index abede9cb017..5b712286b63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -119,6 +119,18 @@ public enum EntityColumnPrefix implements ColumnPrefix { return columnPrefix; } + @Override + public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + + @Override + public byte[] getColumnPrefixBytes(String qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + /* * (non-Javadoc) * @@ -140,8 +152,7 @@ public enum EntityColumnPrefix implements ColumnPrefix { + tableMutator.getName().getNameAsString()); } - byte[] columnQualifier = - ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + byte[] columnQualifier = getColumnPrefixBytes(qualifier); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, attributes); @@ -167,8 +178,7 @@ public enum EntityColumnPrefix implements ColumnPrefix { + tableMutator.getName().getNameAsString()); } - byte[] columnQualifier = - ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + byte[] columnQualifier = getColumnPrefixBytes(qualifier); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, attributes); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java index 38c0f3f3e2e..21ddcc21a67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java @@ -83,6 +83,18 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix return columnPrefix; } + @Override + public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + + @Override + public byte[] getColumnPrefixBytes(String qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + public byte[] getColumnPrefixBytes() { return columnPrefixBytes.clone(); } @@ -112,8 +124,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix + tableMutator.getName().getNameAsString()); } - byte[] columnQualifier = ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifier); + byte[] columnQualifier = getColumnPrefixBytes(qualifier); Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, @@ -233,8 +244,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix + tableMutator.getName().getNameAsString()); } - byte[] columnQualifier = ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifier); + byte[] columnQualifier = getColumnPrefixBytes(qualifier); Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, null, inputValue, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index eb055fe052a..e3bb52d0d40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -89,8 +89,16 @@ public enum FlowRunColumnPrefix implements ColumnPrefix { return columnPrefixBytes.clone(); } - public byte[] getColumnPrefixBytes(String qualifier) { - return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + @Override + public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + + @Override + public byte[] getColumnPrefixBytes(String qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); } public byte[] getColumnFamilyBytes() { @@ -121,8 +129,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix { + tableMutator.getName().getNameAsString()); } - byte[] columnQualifier = ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifier); + byte[] columnQualifier = getColumnPrefixBytes(qualifier); Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, @@ -149,8 +156,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix { + tableMutator.getName().getNameAsString()); } - byte[] columnQualifier = ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifier); + byte[] columnQualifier = getColumnPrefixBytes(qualifier); Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java index 4e23e49cd01..e864d6124c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java @@ -266,7 +266,7 @@ public class TestFileSystemTimelineReaderImpl { // only the id, created and modified time TimelineEntity result = reader.getEntity("user1", "cluster1", "flow1", 1L, "app1", - "app", "id_1", null); + "app", "id_1", null, null, null); Assert.assertEquals( (new TimelineEntity.Identifier("app", "id_1")).toString(), result.getIdentifier().toString()); @@ -281,7 +281,7 @@ public class TestFileSystemTimelineReaderImpl { // Cluster and AppId should be enough to get an entity. TimelineEntity result = reader.getEntity(null, "cluster1", null, null, "app1", - "app", "id_1", null); + "app", "id_1", null, null, null); Assert.assertEquals( (new TimelineEntity.Identifier("app", "id_1")).toString(), result.getIdentifier().toString()); @@ -298,7 +298,7 @@ public class TestFileSystemTimelineReaderImpl { // in app flow mapping csv has commas. TimelineEntity result = reader.getEntity(null, "cluster1", null, null, "app2", - "app", "id_5", null); + "app", "id_5", null, null, null); Assert.assertEquals( (new TimelineEntity.Identifier("app", "id_5")).toString(), result.getIdentifier().toString()); @@ -311,7 +311,7 @@ public class TestFileSystemTimelineReaderImpl { // Specified fields in addition to default view will be returned. TimelineEntity result = reader.getEntity("user1", "cluster1", "flow1", 1L, - "app1", "app", "id_1", + "app1", "app", "id_1", null, null, EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS)); Assert.assertEquals( (new TimelineEntity.Identifier("app", "id_1")).toString(), @@ -329,8 +329,8 @@ public class TestFileSystemTimelineReaderImpl { public void testGetEntityAllFields() throws Exception { // All fields of TimelineEntity will be returned. TimelineEntity result = - reader.getEntity("user1", "cluster1", "flow1", 1L, - "app1", "app", "id_1", EnumSet.of(Field.ALL)); + reader.getEntity("user1", "cluster1", "flow1", 1L, "app1", "app", + "id_1", null, null, EnumSet.of(Field.ALL)); Assert.assertEquals( (new TimelineEntity.Identifier("app", "id_1")).toString(), result.getIdentifier().toString()); @@ -347,7 +347,7 @@ public class TestFileSystemTimelineReaderImpl { Set result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, null, null, null, null, - null, null); + null, null, null, null); // All 3 entities will be returned Assert.assertEquals(4, result.size()); } @@ -357,7 +357,7 @@ public class TestFileSystemTimelineReaderImpl { Set result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", 2L, null, null, null, null, null, null, null, null, null, - null, null); + null, null, null, null); Assert.assertEquals(2, result.size()); // Needs to be rewritten once hashcode and equals for // TimelineEntity is implemented @@ -371,7 +371,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", 3L, null, null, null, null, null, null, null, null, null, - null, null); + null, null, null, null); // Even though 2 entities out of 4 have same created time, one entity // is left out due to limit Assert.assertEquals(3, result.size()); @@ -383,7 +383,7 @@ public class TestFileSystemTimelineReaderImpl { Set result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, 1425016502030L, 1425016502060L, null, null, null, null, null, - null, null, null, null); + null, null, null, null, null, null); Assert.assertEquals(1, result.size()); // Only one entity with ID id_4 should be returned. for (TimelineEntity entity : result) { @@ -396,7 +396,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, 1425016502010L, null, null, null, null, null, null, - null, null, null); + null, null, null, null, null); Assert.assertEquals(3, result.size()); for (TimelineEntity entity : result) { if (entity.getId().equals("id_4")) { @@ -408,7 +408,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, 1425016502010L, null, null, null, null, null, null, null, - null, null, null); + null, null, null, null, null); Assert.assertEquals(1, result.size()); for (TimelineEntity entity : result) { if (!entity.getId().equals("id_4")) { @@ -420,7 +420,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, 1425016502090L, 1425016503020L, null, null, null, - null, null, null, null); + null, null, null, null, null, null); Assert.assertEquals(2, result.size()); // Two entities with IDs' id_1 and id_4 should be returned. for (TimelineEntity entity : result) { @@ -433,7 +433,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, 1425016502090L, null, null, null, null, - null, null, null); + null, null, null, null, null); Assert.assertEquals(2, result.size()); for (TimelineEntity entity : result) { if (entity.getId().equals("id_1") || entity.getId().equals("id_4")) { @@ -445,7 +445,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, 1425016503005L, null, null, null, null, null, - null, null, null); + null, null, null, null, null); Assert.assertEquals(1, result.size()); for (TimelineEntity entity : result) { if (!entity.getId().equals("id_4")) { @@ -462,7 +462,7 @@ public class TestFileSystemTimelineReaderImpl { Set result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, null, infoFilters, null, null, - null, null); + null, null, null, null); Assert.assertEquals(1, result.size()); // Only one entity with ID id_3 should be returned. for (TimelineEntity entity : result) { @@ -478,7 +478,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, null, null, configFilters, null, - null, null); + null, null, null, null); Assert.assertEquals(2, result.size()); for (TimelineEntity entity : result) { if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) { @@ -493,7 +493,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, null, null, null, null, - eventFilters, null); + eventFilters, null, null, null); Assert.assertEquals(1, result.size()); for (TimelineEntity entity : result) { if (!entity.getId().equals("id_3")) { @@ -507,7 +507,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, null, null, null, metricFilters, - null, null); + null, null, null, null); Assert.assertEquals(2, result.size()); // Two entities with IDs' id_1 and id_2 should be returned. for (TimelineEntity entity : result) { @@ -527,7 +527,7 @@ public class TestFileSystemTimelineReaderImpl { Set result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, relatesTo, null, null, null, null, - null, null); + null, null, null, null); Assert.assertEquals(1, result.size()); // Only one entity with ID id_1 should be returned. for (TimelineEntity entity : result) { @@ -544,7 +544,7 @@ public class TestFileSystemTimelineReaderImpl { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, isRelatedTo, null, null, null, - null, null); + null, null, null, null); Assert.assertEquals(2, result.size()); // Two entities with IDs' id_1 and id_3 should be returned. for (TimelineEntity entity : result) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index 30ead40f750..bc7b3a43d17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -49,6 +49,11 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; @@ -60,11 +65,17 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + /** * Various tests to test writing entities to HBase and reading them back from * it. @@ -79,18 +90,344 @@ import org.junit.Test; public class TestHBaseTimelineStorage { private static HBaseTestingUtility util; + private HBaseTimelineReaderImpl reader; @BeforeClass public static void setupBeforeClass() throws Exception { util = new HBaseTestingUtility(); util.startMiniCluster(); createSchema(); + loadEntities(); + loadApps(); } private static void createSchema() throws IOException { TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); } + private static void loadApps() throws IOException { + TimelineEntities te = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + String id = "application_1111111111_2222"; + entity.setId(id); + entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); + Long cTime = 1425016501000L; + Long mTime = 1425026901000L; + entity.setCreatedTime(cTime); + entity.setModifiedTime(mTime); + // add the info map in Timeline Entity + Map infoMap = new HashMap(); + infoMap.put("infoMapKey1", "infoMapValue1"); + infoMap.put("infoMapKey2", 10); + entity.addInfo(infoMap); + // add the isRelatedToEntity info + String key = "task"; + String value = "is_related_to_entity_id_here"; + Set isRelatedToSet = new HashSet(); + isRelatedToSet.add(value); + Map> isRelatedTo = new HashMap>(); + isRelatedTo.put(key, isRelatedToSet); + entity.setIsRelatedToEntities(isRelatedTo); + // add the relatesTo info + key = "container"; + value = "relates_to_entity_id_here"; + Set relatesToSet = new HashSet(); + relatesToSet.add(value); + value = "relates_to_entity_id_here_Second"; + relatesToSet.add(value); + Map> relatesTo = new HashMap>(); + relatesTo.put(key, relatesToSet); + entity.setRelatesToEntities(relatesTo); + // add some config entries + Map conf = new HashMap(); + conf.put("config_param1", "value1"); + conf.put("config_param2", "value2"); + conf.put("cfg_param1", "value3"); + entity.addConfigs(conf); + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("MAP_SLOT_MILLIS"); + Map metricValues = new HashMap(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 120000, 100000000); + metricValues.put(ts - 100000, 200000000); + metricValues.put(ts - 80000, 300000000); + metricValues.put(ts - 60000, 400000000); + metricValues.put(ts - 40000, 50000000000L); + metricValues.put(ts - 20000, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m12 = new TimelineMetric(); + m12.setId("MAP1_BYTES"); + m12.addValue(ts, 50); + metrics.add(m12); + entity.addMetrics(metrics); + TimelineEvent event = new TimelineEvent(); + event.setId("event1"); + event.setTimestamp(ts - 2000); + entity.addEvent(event); + te.addEntity(entity); + + TimelineEntities te1 = new TimelineEntities(); + TimelineEntity entity1 = new TimelineEntity(); + String id1 = "application_1111111111_3333"; + entity1.setId(id1); + entity1.setType(TimelineEntityType.YARN_APPLICATION.toString()); + entity1.setCreatedTime(cTime); + entity1.setModifiedTime(mTime); + + // add the info map in Timeline Entity + Map infoMap1 = new HashMap(); + infoMap1.put("infoMapKey1", "infoMapValue1"); + infoMap1.put("infoMapKey2", 10); + entity1.addInfo(infoMap1); + + // add the isRelatedToEntity info + String key1 = "task"; + String value1 = "is_related_to_entity_id_here"; + Set isRelatedToSet1 = new HashSet(); + isRelatedToSet1.add(value1); + Map> isRelatedTo1 = new HashMap>(); + isRelatedTo1.put(key, isRelatedToSet1); + entity1.setIsRelatedToEntities(isRelatedTo1); + + // add the relatesTo info + key1 = "container"; + value1 = "relates_to_entity_id_here"; + Set relatesToSet1 = new HashSet(); + relatesToSet1.add(value1); + value1 = "relates_to_entity_id_here_Second"; + relatesToSet1.add(value1); + Map> relatesTo1 = new HashMap>(); + relatesTo1.put(key1, relatesToSet1); + entity1.setRelatesToEntities(relatesTo1); + + // add some config entries + Map conf1 = new HashMap(); + conf1.put("cfg_param1", "value1"); + conf1.put("cfg_param2", "value2"); + entity1.addConfigs(conf1); + + // add metrics + Set metrics1 = new HashSet<>(); + TimelineMetric m2 = new TimelineMetric(); + m2.setId("MAP1_SLOT_MILLIS"); + Map metricValues1 = new HashMap(); + long ts1 = System.currentTimeMillis(); + metricValues1.put(ts1 - 120000, 100000000); + metricValues1.put(ts1 - 100000, 200000000); + metricValues1.put(ts1 - 80000, 300000000); + metricValues1.put(ts1 - 60000, 400000000); + metricValues1.put(ts1 - 40000, 50000000000L); + metricValues1.put(ts1 - 20000, 60000000000L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues1); + metrics1.add(m2); + entity1.addMetrics(metrics1); + te1.addEntity(entity1); + + TimelineEntities te2 = new TimelineEntities(); + TimelineEntity entity2 = new TimelineEntity(); + String id2 = "application_1111111111_4444"; + entity2.setId(id2); + entity2.setType(TimelineEntityType.YARN_APPLICATION.toString()); + entity2.setCreatedTime(cTime); + entity2.setModifiedTime(mTime); + te2.addEntity(entity2); + HBaseTimelineWriterImpl hbi = null; + try { + hbi = new HBaseTimelineWriterImpl(util.getConfiguration()); + hbi.init(util.getConfiguration()); + hbi.start(); + String cluster = "cluster1"; + String user = "user1"; + String flow = "some_flow_name"; + String flowVersion = "AB7822C10F1111"; + long runid = 1002345678919L; + String appName = "application_1111111111_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + appName = "application_1111111111_3333"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + appName = "application_1111111111_4444"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te2); + hbi.stop(); + } finally { + if (hbi != null) { + hbi.stop(); + hbi.close(); + } + } + } + + private static void loadEntities() throws IOException { + TimelineEntities te = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + String id = "hello"; + String type = "world"; + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + Long mTime = 1425026901000L; + entity.setCreatedTime(cTime); + entity.setModifiedTime(mTime); + // add the info map in Timeline Entity + Map infoMap = new HashMap(); + infoMap.put("infoMapKey1", "infoMapValue1"); + infoMap.put("infoMapKey2", 10); + entity.addInfo(infoMap); + // add the isRelatedToEntity info + String key = "task"; + String value = "is_related_to_entity_id_here"; + Set isRelatedToSet = new HashSet(); + isRelatedToSet.add(value); + Map> isRelatedTo = new HashMap>(); + isRelatedTo.put(key, isRelatedToSet); + entity.setIsRelatedToEntities(isRelatedTo); + + // add the relatesTo info + key = "container"; + value = "relates_to_entity_id_here"; + Set relatesToSet = new HashSet(); + relatesToSet.add(value); + value = "relates_to_entity_id_here_Second"; + relatesToSet.add(value); + Map> relatesTo = new HashMap>(); + relatesTo.put(key, relatesToSet); + entity.setRelatesToEntities(relatesTo); + + // add some config entries + Map conf = new HashMap(); + conf.put("config_param1", "value1"); + conf.put("config_param2", "value2"); + conf.put("cfg_param1", "value3"); + entity.addConfigs(conf); + + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("MAP_SLOT_MILLIS"); + Map metricValues = new HashMap(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 120000, 100000000); + metricValues.put(ts - 100000, 200000000); + metricValues.put(ts - 80000, 300000000); + metricValues.put(ts - 60000, 400000000); + metricValues.put(ts - 40000, 50000000000L); + metricValues.put(ts - 20000, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m12 = new TimelineMetric(); + m12.setId("MAP1_BYTES"); + m12.addValue(ts, 50); + metrics.add(m12); + entity.addMetrics(metrics); + te.addEntity(entity); + + TimelineEntity entity1 = new TimelineEntity(); + String id1 = "hello1"; + entity1.setId(id1); + entity1.setType(type); + entity1.setCreatedTime(cTime); + entity1.setModifiedTime(mTime); + + // add the info map in Timeline Entity + Map infoMap1 = new HashMap(); + infoMap1.put("infoMapKey1", "infoMapValue1"); + infoMap1.put("infoMapKey2", 10); + entity1.addInfo(infoMap1); + + // add the isRelatedToEntity info + String key1 = "task"; + String value1 = "is_related_to_entity_id_here"; + Set isRelatedToSet1 = new HashSet(); + isRelatedToSet1.add(value1); + Map> isRelatedTo1 = new HashMap>(); + isRelatedTo1.put(key, isRelatedToSet1); + entity1.setIsRelatedToEntities(isRelatedTo1); + + // add the relatesTo info + key1 = "container"; + value1 = "relates_to_entity_id_here"; + Set relatesToSet1 = new HashSet(); + relatesToSet1.add(value1); + value1 = "relates_to_entity_id_here_Second"; + relatesToSet1.add(value1); + Map> relatesTo1 = new HashMap>(); + relatesTo1.put(key1, relatesToSet1); + entity1.setRelatesToEntities(relatesTo1); + + // add some config entries + Map conf1 = new HashMap(); + conf1.put("cfg_param1", "value1"); + conf1.put("cfg_param2", "value2"); + entity1.addConfigs(conf1); + + // add metrics + Set metrics1 = new HashSet<>(); + TimelineMetric m2 = new TimelineMetric(); + m2.setId("MAP1_SLOT_MILLIS"); + Map metricValues1 = new HashMap(); + long ts1 = System.currentTimeMillis(); + metricValues1.put(ts1 - 120000, 100000000); + metricValues1.put(ts1 - 100000, 200000000); + metricValues1.put(ts1 - 80000, 300000000); + metricValues1.put(ts1 - 60000, 400000000); + metricValues1.put(ts1 - 40000, 50000000000L); + metricValues1.put(ts1 - 20000, 60000000000L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues1); + metrics1.add(m2); + entity1.addMetrics(metrics1); + te.addEntity(entity1); + + TimelineEntity entity2 = new TimelineEntity(); + String id2 = "hello2"; + entity2.setId(id2); + entity2.setType(type); + entity2.setCreatedTime(cTime); + entity2.setModifiedTime(mTime); + te.addEntity(entity2); + HBaseTimelineWriterImpl hbi = null; + try { + hbi = new HBaseTimelineWriterImpl(util.getConfiguration()); + hbi.init(util.getConfiguration()); + hbi.start(); + String cluster = "cluster1"; + String user = "user1"; + String flow = "some_flow_name"; + String flowVersion = "AB7822C10F1111"; + long runid = 1002345678919L; + String appName = "application_1231111111_1111"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.stop(); + } finally { + if (hbi != null) { + hbi.stop(); + hbi.close(); + } + } + } + + @Before + public void init() throws Exception { + reader = new HBaseTimelineReaderImpl(); + reader.init(util.getConfiguration()); + reader.start(); + } + + @After + public void stop() throws Exception { + if (reader != null) { + reader.stop(); + reader.close(); + } + } + private static void matchMetrics(Map m1, Map m2) { assertEquals(m1.size(), m2.size()); for (Map.Entry entry : m2.entrySet()) { @@ -163,15 +500,11 @@ public class TestHBaseTimelineStorage { te.addEntity(entity); HBaseTimelineWriterImpl hbi = null; - HBaseTimelineReaderImpl hbr = null; try { Configuration c1 = util.getConfiguration(); hbi = new HBaseTimelineWriterImpl(c1); hbi.init(c1); hbi.start(); - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); String cluster = "cluster_test_write_app"; String user = "user1"; String flow = "some_flow_name"; @@ -256,8 +589,8 @@ public class TestHBaseTimelineStorage { matchMetrics(metricValues, metricMap); // read the timeline entity using the reader this time - TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId, - entity.getType(), entity.getId(), + TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appId, + entity.getType(), entity.getId(), null, null, EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); @@ -290,10 +623,6 @@ public class TestHBaseTimelineStorage { hbi.stop(); hbi.close(); } - if (hbr != null) { - hbr.stop(); - hbr.close(); - } } } @@ -362,15 +691,11 @@ public class TestHBaseTimelineStorage { te.addEntity(entity); HBaseTimelineWriterImpl hbi = null; - HBaseTimelineReaderImpl hbr = null; try { Configuration c1 = util.getConfiguration(); hbi = new HBaseTimelineWriterImpl(c1); hbi.init(c1); hbi.start(); - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); String cluster = "cluster_test_write_entity"; String user = "user1"; String flow = "some_flow_name"; @@ -468,12 +793,13 @@ public class TestHBaseTimelineStorage { assertEquals(17, colCount); // read the timeline entity using the reader this time - TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, - entity.getType(), entity.getId(), + TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName, + entity.getType(), entity.getId(), null, null, EnumSet.of(TimelineReader.Field.ALL)); - Set es1 = hbr.getEntities(user, cluster, flow, runid, + Set es1 = reader.getEntities(user, cluster, flow, runid, appName, entity.getType(), null, null, null, null, null, null, null, - null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); + null, null, null, null, null, null, + EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); assertEquals(1, es1.size()); @@ -505,10 +831,6 @@ public class TestHBaseTimelineStorage { hbi.stop(); hbi.close(); } - if (hbr != null) { - hbr.stop(); - hbr.close(); - } } } @@ -559,15 +881,11 @@ public class TestHBaseTimelineStorage { entities.addEntity(entity); HBaseTimelineWriterImpl hbi = null; - HBaseTimelineReaderImpl hbr = null; try { Configuration c1 = util.getConfiguration(); hbi = new HBaseTimelineWriterImpl(c1); hbi.init(c1); hbi.start(); - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); String cluster = "cluster_test_events"; String user = "user2"; String flow = "other_flow_name"; @@ -612,11 +930,11 @@ public class TestHBaseTimelineStorage { } // read the timeline entity using the reader this time - TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, - entity.getType(), entity.getId(), + TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName, + entity.getType(), entity.getId(), null, null, EnumSet.of(TimelineReader.Field.ALL)); - TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName, - entity.getType(), entity.getId(), + TimelineEntity e2 = reader.getEntity(user, cluster, null, null, appName, + entity.getType(), entity.getId(), null, null, EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); assertNotNull(e2); @@ -641,10 +959,6 @@ public class TestHBaseTimelineStorage { hbi.stop(); hbi.close(); } - if (hbr != null) { - hbr.stop(); - hbr.close(); - } } } @@ -665,15 +979,11 @@ public class TestHBaseTimelineStorage { entities.addEntity(entity); HBaseTimelineWriterImpl hbi = null; - HBaseTimelineReaderImpl hbr = null; try { Configuration c1 = util.getConfiguration(); hbi = new HBaseTimelineWriterImpl(c1); hbi.init(c1); hbi.start(); - hbr = new HBaseTimelineReaderImpl(); - hbr.init(c1); - hbr.start(); String cluster = "cluster_test_empty_eventkey"; String user = "user_emptyeventkey"; String flow = "other_flow_name"; @@ -726,12 +1036,13 @@ public class TestHBaseTimelineStorage { assertEquals(1, rowCount); // read the timeline entity using the reader this time - TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, - entity.getType(), entity.getId(), + TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName, + entity.getType(), entity.getId(), null, null, EnumSet.of(TimelineReader.Field.ALL)); - Set es1 = hbr.getEntities(user, cluster, flow, runid, + Set es1 = reader.getEntities(user, cluster, flow, runid, appName, entity.getType(), null, null, null, null, null, null, null, - null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); + null, null, null, null, null, null, + EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); assertEquals(1, es1.size()); @@ -748,8 +1059,6 @@ public class TestHBaseTimelineStorage { } finally { hbi.stop(); hbi.close(); - hbr.stop();; - hbr.close(); } } @@ -816,6 +1125,291 @@ public class TestHBaseTimelineStorage { } } + @Test + public void testReadEntities() throws Exception { + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1231111111_1111","world", "hello", null, + null, EnumSet.of(Field.ALL)); + assertNotNull(e1); + assertEquals(3, e1.getConfigs().size()); + assertEquals(1, e1.getIsRelatedToEntities().size()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1231111111_1111","world", + null, null, null, null, null, null, null, null, null, null, null, null, + null, EnumSet.of(Field.ALL)); + assertEquals(3, es1.size()); + } + + @Test + public void testReadEntitiesDefaultView() throws Exception { + TimelineEntity e1 = + reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L, + "application_1231111111_1111","world", "hello", null, null, null); + assertNotNull(e1); + assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() && + e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() && + e1.getRelatesToEntities().isEmpty()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1231111111_1111","world", + null, null, null, null, null, null, null, null, null, null, null, null, + null, null); + assertEquals(3, es1.size()); + for (TimelineEntity e : es1) { + assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() && + e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() && + e.getRelatesToEntities().isEmpty()); + } + } + + @Test + public void testReadEntitiesByFields() throws Exception { + TimelineEntity e1 = + reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L, + "application_1231111111_1111","world", "hello", null, null, + EnumSet.of(Field.INFO, Field.CONFIGS)); + assertNotNull(e1); + assertEquals(3, e1.getConfigs().size()); + assertEquals(0, e1.getIsRelatedToEntities().size()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1231111111_1111","world", + null, null, null, null, null, null, null, null, null, null, null, null, + null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS)); + assertEquals(3, es1.size()); + int metricsCnt = 0; + int isRelatedToCnt = 0; + int infoCnt = 0; + for (TimelineEntity entity : es1) { + metricsCnt += entity.getMetrics().size(); + isRelatedToCnt += entity.getIsRelatedToEntities().size(); + infoCnt += entity.getInfo().size(); + } + assertEquals(0, infoCnt); + assertEquals(2, isRelatedToCnt); + assertEquals(3, metricsCnt); + } + + @Test + public void testReadEntitiesConfigPrefix() throws Exception { + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_")); + TimelineEntity e1 = + reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L, + "application_1231111111_1111","world", "hello", list, null, null); + assertNotNull(e1); + assertEquals(1, e1.getConfigs().size()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1231111111_1111","world", + null, null, null, null, null, null, null, null, null, null, null, + list, null, null); + int cfgCnt = 0; + for (TimelineEntity entity : es1) { + cfgCnt += entity.getConfigs().size(); + } + assertEquals(3, cfgCnt); + } + + @Test + public void testReadEntitiesConfigFilterPrefix() throws Exception { + Map confFilters = ImmutableMap.of("cfg_param1","value1"); + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_")); + Set entities = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1231111111_1111","world", + null, null, null, null, null, null, null, null, confFilters, null, null, + list, null, null); + assertEquals(1, entities.size()); + int cfgCnt = 0; + for (TimelineEntity entity : entities) { + cfgCnt += entity.getConfigs().size(); + } + assertEquals(2, cfgCnt); + } + + @Test + public void testReadEntitiesMetricPrefix() throws Exception { + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_")); + TimelineEntity e1 = + reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L, + "application_1231111111_1111","world", "hello", null, list, null); + assertNotNull(e1); + assertEquals(1, e1.getMetrics().size()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1231111111_1111","world", + null, null, null, null, null, null, null, null, null, null, null, null, + list, null); + int metricCnt = 0; + for (TimelineEntity entity : es1) { + metricCnt += entity.getMetrics().size(); + } + assertEquals(2, metricCnt); + } + + @Test + public void testReadEntitiesMetricFilterPrefix() throws Exception { + Set metricFilters = ImmutableSet.of("MAP1_SLOT_MILLIS"); + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_")); + Set entities = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1231111111_1111","world", + null, null, null, null, null, null, null, null, null, metricFilters, + null, null, list, null); + assertEquals(1, entities.size()); + int metricCnt = 0; + for (TimelineEntity entity : entities) { + metricCnt += entity.getMetrics().size(); + } + assertEquals(1, metricCnt); + } + + @Test + public void testReadApps() throws Exception { + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1111111111_2222", + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, + EnumSet.of(Field.ALL)); + assertNotNull(e1); + assertEquals(3, e1.getConfigs().size()); + assertEquals(1, e1.getIsRelatedToEntities().size()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, null, null, null, null, + EnumSet.of(Field.ALL)); + assertEquals(3, es1.size()); + } + + @Test + public void testReadAppsDefaultView() throws Exception { + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1111111111_2222", + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null); + assertNotNull(e1); + assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() && + e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() && + e1.getRelatesToEntities().isEmpty()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, null, null, null, null, null); + assertEquals(3, es1.size()); + for (TimelineEntity e : es1) { + assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() && + e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() && + e.getRelatesToEntities().isEmpty()); + } + } + + @Test + public void testReadAppsByFields() throws Exception { + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1111111111_2222", + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, + EnumSet.of(Field.INFO, Field.CONFIGS)); + assertNotNull(e1); + assertEquals(3, e1.getConfigs().size()); + assertEquals(0, e1.getIsRelatedToEntities().size()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, null, null, null, null, + EnumSet.of(Field.IS_RELATED_TO, Field.METRICS)); + assertEquals(3, es1.size()); + int metricsCnt = 0; + int isRelatedToCnt = 0; + int infoCnt = 0; + for (TimelineEntity entity : es1) { + metricsCnt += entity.getMetrics().size(); + isRelatedToCnt += entity.getIsRelatedToEntities().size(); + infoCnt += entity.getInfo().size(); + } + assertEquals(0, infoCnt); + assertEquals(2, isRelatedToCnt); + assertEquals(3, metricsCnt); + } + + @Test + public void testReadAppsConfigPrefix() throws Exception { + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_")); + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1111111111_2222", + TimelineEntityType.YARN_APPLICATION.toString(), null, list, null, null); + assertNotNull(e1); + assertEquals(1, e1.getConfigs().size()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, null, null, list, null, null); + int cfgCnt = 0; + for (TimelineEntity entity : es1) { + cfgCnt += entity.getConfigs().size(); + } + assertEquals(3, cfgCnt); + } + + @Test + public void testReadAppsConfigFilterPrefix() throws Exception { + Map confFilters = ImmutableMap.of("cfg_param1","value1"); + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_")); + Set entities = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, confFilters, null, null, list, null, null); + assertEquals(1, entities.size()); + int cfgCnt = 0; + for (TimelineEntity entity : entities) { + cfgCnt += entity.getConfigs().size(); + } + assertEquals(2, cfgCnt); + } + + @Test + public void testReadAppsMetricPrefix() throws Exception { + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_")); + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1111111111_2222", + TimelineEntityType.YARN_APPLICATION.toString(), null, null, list, null); + assertNotNull(e1); + assertEquals(1, e1.getMetrics().size()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, null, null, null, list, null); + int metricCnt = 0; + for (TimelineEntity entity : es1) { + metricCnt += entity.getMetrics().size(); + } + assertEquals(2, metricCnt); + } + + @Test + public void testReadAppsMetricFilterPrefix() throws Exception { + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_")); + Set metricFilters = ImmutableSet.of("MAP1_SLOT_MILLIS"); + Set entities = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, metricFilters, null, null, list, null); + int metricCnt = 0; + assertEquals(1, entities.size()); + for (TimelineEntity entity : entities) { + metricCnt += entity.getMetrics().size(); + } + assertEquals(1, metricCnt); + } + @AfterClass public static void tearDownAfterClass() throws Exception { util.shutdownMiniCluster(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index c957dad075d..434adacf9f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -182,7 +182,7 @@ public class TestHBaseStorageFlowActivity { Set entities = hbr.getEntities(null, cluster, null, null, null, TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null, - null, null, null, null, null, null, null, null, null); + null, null, null, null, null, null, null, null, null, null, null); assertEquals(1, entities.size()); for (TimelineEntity e : entities) { FlowActivityEntity flowActivity = (FlowActivityEntity)e; @@ -238,7 +238,7 @@ public class TestHBaseStorageFlowActivity { Set entities = hbr.getEntities(user, cluster, flow, null, null, TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null, - null, null, null, null, null, null, null, null, null); + null, null, null, null, null, null, null, null, null, null, null); assertEquals(1, entities.size()); for (TimelineEntity e : entities) { FlowActivityEntity entity = (FlowActivityEntity)e; @@ -353,7 +353,7 @@ public class TestHBaseStorageFlowActivity { Set entities = hbr.getEntities(null, cluster, null, null, null, TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null, - null, null, null, null, null, null, null, null, null); + null, null, null, null, null, null, null, null, null, null, null); assertEquals(1, entities.size()); for (TimelineEntity e : entities) { FlowActivityEntity flowActivity = (FlowActivityEntity)e; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index 4fb8f0e000e..5da01920ae3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.EnumSet; import java.util.Map; import java.util.Set; @@ -44,9 +45,13 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -178,9 +183,8 @@ public class TestHBaseStorageFlowRun { hbr.init(c1); hbr.start(); // get the flow run entity - TimelineEntity entity = - hbr.getEntity(user, cluster, flow, runid, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null, null); + TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, null); assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); FlowRunEntity flowRun = (FlowRunEntity)entity; assertEquals(minStartTs, flowRun.getStartTime()); @@ -238,9 +242,8 @@ public class TestHBaseStorageFlowRun { hbr = new HBaseTimelineReaderImpl(); hbr.init(c1); hbr.start(); - TimelineEntity entity = - hbr.getEntity(user, cluster, flow, runid, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null, null); + TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, null); assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); Set metrics = entity.getMetrics(); assertEquals(2, metrics.size()); @@ -305,6 +308,181 @@ public class TestHBaseStorageFlowRun { assertEquals(1, rowCount); } + @Test + public void testWriteFlowRunMetricsPrefix() throws Exception { + String cluster = "testWriteFlowRunMetricsOneFlow_cluster1"; + String user = "testWriteFlowRunMetricsOneFlow_user1"; + String flow = "testing_flowRun_metrics_flow_name"; + String flowVersion = "CF7022C10F1354"; + long runid = 1002345678919L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_11111111111111_1111"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + // write another application with same metric to this flow + te = new TimelineEntities(); + TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); + te.addEntity(entityApp2); + appName = "application_11111111111111_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.flush(); + } finally { + hbi.close(); + } + + // check flow run + checkFlowRunTable(cluster, user, flow, runid, c1); + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + TimelineFilterList metricsToRetrieve = + new TimelineFilterList(new TimelinePrefixFilter(TimelineCompareOp.EQUAL, + metric1.substring(0, metric1.indexOf("_") + 1))); + TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, + metricsToRetrieve, null); + assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); + Set metrics = entity.getMetrics(); + assertEquals(1, metrics.size()); + for (TimelineMetric metric : metrics) { + String id = metric.getId(); + Map values = metric.getValues(); + assertEquals(1, values.size()); + Number value = null; + for (Number n : values.values()) { + value = n; + } + switch (id) { + case metric1: + assertEquals(141L, value); + break; + default: + fail("unrecognized metric: " + id); + } + } + + Set entities = hbr.getEntities(user, cluster, flow, runid, + null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, + null, null, null, null, null, null, null, null, null, + metricsToRetrieve, null); + assertEquals(1, entities.size()); + for (TimelineEntity timelineEntity : entities) { + Set timelineMetrics = timelineEntity.getMetrics(); + assertEquals(1, timelineMetrics.size()); + for (TimelineMetric metric : timelineMetrics) { + String id = metric.getId(); + Map values = metric.getValues(); + assertEquals(1, values.size()); + Number value = null; + for (Number n : values.values()) { + value = n; + } + switch (id) { + case metric1: + assertEquals(141L, value); + break; + default: + fail("unrecognized metric: " + id); + } + } + } + } finally { + hbr.close(); + } + } + + @Test + public void testWriteFlowRunsMetricFields() throws Exception { + String cluster = "testWriteFlowRunMetricsOneFlow_cluster1"; + String user = "testWriteFlowRunMetricsOneFlow_user1"; + String flow = "testing_flowRun_metrics_flow_name"; + String flowVersion = "CF7022C10F1354"; + long runid = 1002345678919L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_11111111111111_1111"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + // write another application with same metric to this flow + te = new TimelineEntities(); + TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); + te.addEntity(entityApp2); + appName = "application_11111111111111_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.flush(); + } finally { + hbi.close(); + } + + // check flow run + checkFlowRunTable(cluster, user, flow, runid, c1); + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + Set entities = hbr.getEntities(user, cluster, flow, runid, + null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, + null, null, null, null, null, null, null, null, null, null, null); + assertEquals(1, entities.size()); + for (TimelineEntity timelineEntity : entities) { + assertEquals(0, timelineEntity.getMetrics().size()); + } + + entities = hbr.getEntities(user, cluster, flow, runid, + null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, + null, null, null, null, null, null, null, null, null, + null, EnumSet.of(Field.METRICS)); + assertEquals(1, entities.size()); + for (TimelineEntity timelineEntity : entities) { + Set timelineMetrics = timelineEntity.getMetrics(); + assertEquals(2, timelineMetrics.size()); + for (TimelineMetric metric : timelineMetrics) { + String id = metric.getId(); + Map values = metric.getValues(); + assertEquals(1, values.size()); + Number value = null; + for (Number n : values.values()) { + value = n; + } + switch (id) { + case metric1: + assertEquals(141L, value); + break; + case metric2: + assertEquals(57L, value); + break; + default: + fail("unrecognized metric: " + id); + } + } + } + } finally { + hbr.close(); + } + } + @AfterClass public static void tearDownAfterClass() throws Exception { util.shutdownMiniCluster();