From a01091e90c18575e75eb1ecc3e1e2feea5146d20 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Wed, 21 Mar 2018 08:11:19 +0530 Subject: [PATCH] YARN-7581. HBase filters are not constructed correctly in ATSv2. Contributed by Habio Chen. (cherry picked from commit 29acea5000337a7f529bb1810a2af2b0af4d5f1d) --- .../reader/ApplicationEntityReader.java | 14 ++++-- .../reader/FlowActivityEntityReader.java | 4 +- .../storage/reader/FlowRunEntityReader.java | 7 ++- .../storage/reader/GenericEntityReader.java | 12 +++-- .../reader/SubApplicationEntityReader.java | 15 ++++-- .../storage/reader/TimelineEntityReader.java | 46 +++++++++++++++++-- 6 files changed, 83 insertions(+), 15 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java index 7440316c9e2..29ba1845db9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; @@ -258,7 +259,8 @@ class ApplicationEntityReader extends GenericEntityReader { * @throws IOException if any problem occurs while updating filter list. */ private void updateFilterForConfsAndMetricsToRetrieve( - FilterList listBasedOnFields) throws IOException { + FilterList listBasedOnFields, Set cfsInFields) + throws IOException { TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); // Please note that if confsToRetrieve is specified, we would have added // CONFS to fields to retrieve in augmentParams() even if not specified. @@ -268,6 +270,8 @@ class ApplicationEntityReader extends GenericEntityReader { createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getConfsToRetrieve(), ApplicationColumnFamily.CONFIGS, ApplicationColumnPrefix.CONFIG)); + cfsInFields.add( + Bytes.toString(ApplicationColumnFamily.CONFIGS.getBytes())); } // Please note that if metricsToRetrieve is specified, we would have added @@ -278,11 +282,14 @@ class ApplicationEntityReader extends GenericEntityReader { createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getMetricsToRetrieve(), ApplicationColumnFamily.METRICS, ApplicationColumnPrefix.METRIC)); + cfsInFields.add( + Bytes.toString(ApplicationColumnFamily.METRICS.getBytes())); } } @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { + protected FilterList constructFilterListBasedOnFields(Set cfsInFields) + throws IOException { if (!needCreateFilterListBasedOnFields()) { // Fetch all the columns. No need of a filter. return null; @@ -303,8 +310,9 @@ class ApplicationEntityReader extends GenericEntityReader { excludeFieldsFromInfoColFamily(infoColFamilyList); } listBasedOnFields.addFilter(infoColFamilyList); + cfsInFields.add(Bytes.toString(ApplicationColumnFamily.INFO.getBytes())); - updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); + updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields, cfsInFields); return listBasedOnFields; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index d0a0f3bb46e..7b7eef57015 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; @@ -97,7 +98,8 @@ class FlowActivityEntityReader extends TimelineEntityReader { } @Override - protected FilterList constructFilterListBasedOnFields() { + protected FilterList constructFilterListBasedOnFields( + Set cfsInFields) { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java index 33a2cf67a27..80d3e9b3363 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; import java.util.EnumSet; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; @@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; @@ -152,7 +154,8 @@ class FlowRunEntityReader extends TimelineEntityReader { } @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { + protected FilterList constructFilterListBasedOnFields( + Set cfsInFields) throws IOException { FilterList list = new FilterList(Operator.MUST_PASS_ONE); // By default fetch everything in INFO column family. FamilyFilter infoColumnFamily = @@ -166,6 +169,7 @@ class FlowRunEntityReader extends TimelineEntityReader { && !hasField(dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) { FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); infoColFamilyList.addFilter(infoColumnFamily); + cfsInFields.add(Bytes.toString(FlowRunColumnFamily.INFO.getBytes())); infoColFamilyList.addFilter(new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator(FlowRunColumnPrefix.METRIC .getColumnPrefixBytes("")))); @@ -182,6 +186,7 @@ class FlowRunEntityReader extends TimelineEntityReader { && !metricsToRetrieve.getFilterList().isEmpty()) { FilterList infoColFamilyList = new FilterList(); infoColFamilyList.addFilter(infoColumnFamily); + cfsInFields.add(Bytes.toString(FlowRunColumnFamily.INFO.getBytes())); FilterList columnsList = updateFixedColumns(); columnsList.addFilter(TimelineFilterUtils.createHBaseFilterList( FlowRunColumnPrefix.METRIC, metricsToRetrieve)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index 02eca84f1f2..6e62f20aa7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; @@ -349,7 +350,8 @@ class GenericEntityReader extends TimelineEntityReader { * @throws IOException if any problem occurs while updating filter list. */ private void updateFilterForConfsAndMetricsToRetrieve( - FilterList listBasedOnFields) throws IOException { + FilterList listBasedOnFields, Set cfsInFields) + throws IOException { TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); // Please note that if confsToRetrieve is specified, we would have added // CONFS to fields to retrieve in augmentParams() even if not specified. @@ -359,6 +361,7 @@ class GenericEntityReader extends TimelineEntityReader { .createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS, EntityColumnPrefix.CONFIG)); + cfsInFields.add(Bytes.toString(EntityColumnFamily.CONFIGS.getBytes())); } // Please note that if metricsToRetrieve is specified, we would have added @@ -369,11 +372,13 @@ class GenericEntityReader extends TimelineEntityReader { .createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getMetricsToRetrieve(), EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC)); + cfsInFields.add(Bytes.toString(EntityColumnFamily.METRICS.getBytes())); } } @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { + protected FilterList constructFilterListBasedOnFields(Set cfsInFields) + throws IOException { if (!needCreateFilterListBasedOnFields()) { // Fetch all the columns. No need of a filter. return null; @@ -394,7 +399,8 @@ class GenericEntityReader extends TimelineEntityReader { excludeFieldsFromInfoColFamily(infoColFamilyList); } listBasedOnFields.addFilter(infoColFamilyList); - updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); + cfsInFields.add(Bytes.toString(EntityColumnFamily.INFO.getBytes())); + updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields, cfsInFields); return listBasedOnFields; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java index faed34857d7..6a91c7b46d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; @@ -247,7 +248,8 @@ class SubApplicationEntityReader extends GenericEntityReader { * @throws IOException if any problem occurs while updating filter list. */ private void updateFilterForConfsAndMetricsToRetrieve( - FilterList listBasedOnFields) throws IOException { + FilterList listBasedOnFields, Set cfsInFields) + throws IOException { TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); // Please note that if confsToRetrieve is specified, we would have added // CONFS to fields to retrieve in augmentParams() even if not specified. @@ -258,6 +260,8 @@ class SubApplicationEntityReader extends GenericEntityReader { dataToRetrieve.getConfsToRetrieve(), SubApplicationColumnFamily.CONFIGS, SubApplicationColumnPrefix.CONFIG)); + cfsInFields.add( + Bytes.toString(SubApplicationColumnFamily.CONFIGS.getBytes())); } // Please note that if metricsToRetrieve is specified, we would have added @@ -269,11 +273,14 @@ class SubApplicationEntityReader extends GenericEntityReader { dataToRetrieve.getMetricsToRetrieve(), SubApplicationColumnFamily.METRICS, SubApplicationColumnPrefix.METRIC)); + cfsInFields.add( + Bytes.toString(SubApplicationColumnFamily.METRICS.getBytes())); } } @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { + protected FilterList constructFilterListBasedOnFields(Set cfsInFields) + throws IOException { if (!needCreateFilterListBasedOnFields()) { // Fetch all the columns. No need of a filter. return null; @@ -293,7 +300,9 @@ class SubApplicationEntityReader extends GenericEntityReader { excludeFieldsFromInfoColFamily(infoColFamilyList); } listBasedOnFields.addFilter(infoColFamilyList); - updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); + cfsInFields.add( + Bytes.toString(SubApplicationColumnFamily.INFO.getBytes())); + updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields, cfsInFields); return listBasedOnFields; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index 3168163ed96..43ba2afe739 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -30,11 +30,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; @@ -122,11 +127,12 @@ public abstract class TimelineEntityReader extends * results fetched from HBase back-end storage. This is called only for * multiple entity reads. * + * @param cfsInFields column families in the fields * @return a {@link FilterList} object. * @throws IOException if any problem occurs while creating filter list. */ - protected abstract FilterList constructFilterListBasedOnFields() - throws IOException; + protected abstract FilterList constructFilterListBasedOnFields( + Set cfsInFields) throws IOException; /** * Creates a {@link FilterList} based on info, config and metric filters. This @@ -151,7 +157,9 @@ public abstract class TimelineEntityReader extends FilterList listBasedOnFilters = constructFilterListBasedOnFilters(); boolean hasListBasedOnFilters = listBasedOnFilters != null && !listBasedOnFilters.getFilters().isEmpty(); - FilterList listBasedOnFields = constructFilterListBasedOnFields(); + Set cfsInListBasedOnFields = new HashSet<>(0); + FilterList listBasedOnFields = + constructFilterListBasedOnFields(cfsInListBasedOnFields); boolean hasListBasedOnFields = listBasedOnFields != null && !listBasedOnFields.getFilters().isEmpty(); // If filter lists based on both filters and fields can be created, @@ -164,6 +172,21 @@ public abstract class TimelineEntityReader extends if (hasListBasedOnFilters && hasListBasedOnFields) { FilterList list = new FilterList(); list.addFilter(listBasedOnFilters); + + Set cfsInListBasedOnFilters = new HashSet<>(0); + extractColumnFamiliesFromFiltersBasedOnFilters( + listBasedOnFilters, cfsInListBasedOnFilters); + + // must exclude cfs that are already covered in fields-based filters + // otherwise we will return the whole cf + cfsInListBasedOnFilters.removeAll(cfsInListBasedOnFields); + + if (!cfsInListBasedOnFilters.isEmpty()) { + for (String cf: cfsInListBasedOnFilters) { + listBasedOnFields.addFilter(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(Bytes.toBytes(cf)))); + } + } list.addFilter(listBasedOnFields); return list; } else if (hasListBasedOnFilters) { @@ -174,6 +197,21 @@ public abstract class TimelineEntityReader extends return null; } + private static void extractColumnFamiliesFromFiltersBasedOnFilters( + Filter hbaseFilterBasedOnTLSFilter, Set columnFamilies) { + if (hbaseFilterBasedOnTLSFilter instanceof SingleColumnValueFilter) { + byte[] cf = ((SingleColumnValueFilter) + hbaseFilterBasedOnTLSFilter).getFamily(); + columnFamilies.add(Bytes.toString(cf)); + } else if (hbaseFilterBasedOnTLSFilter instanceof FilterList) { + FilterList filterListBase = (FilterList) hbaseFilterBasedOnTLSFilter; + for (Filter fs: filterListBase.getFilters()) { + extractColumnFamiliesFromFiltersBasedOnFilters(fs, columnFamilies); + } + } + } + + protected TimelineDataToRetrieve getDataToRetrieve() { return dataToRetrieve; } @@ -206,7 +244,7 @@ public abstract class TimelineEntityReader extends validateParams(); augmentParams(hbaseConf, conn); - FilterList filterList = constructFilterListBasedOnFields(); + FilterList filterList = constructFilterListBasedOnFields(new HashSet<>(0)); if (LOG.isDebugEnabled() && filterList != null) { LOG.debug("FilterList created for get is - " + filterList); }