YARN-7581. HBase filters are not constructed correctly in ATSv2. Contributed by Habio Chen.

(cherry picked from commit 29acea5000)
This commit is contained in:
Rohith Sharma K S 2018-03-21 08:11:19 +05:30
parent d8764f1c90
commit bbfe36d686
6 changed files with 83 additions and 15 deletions

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
@ -258,7 +259,8 @@ class ApplicationEntityReader extends GenericEntityReader {
* @throws IOException if any problem occurs while updating filter list. * @throws IOException if any problem occurs while updating filter list.
*/ */
private void updateFilterForConfsAndMetricsToRetrieve( private void updateFilterForConfsAndMetricsToRetrieve(
FilterList listBasedOnFields) throws IOException { FilterList listBasedOnFields, Set<String> cfsInFields)
throws IOException {
TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
// Please note that if confsToRetrieve is specified, we would have added // Please note that if confsToRetrieve is specified, we would have added
// CONFS to fields to retrieve in augmentParams() even if not specified. // CONFS to fields to retrieve in augmentParams() even if not specified.
@ -268,6 +270,8 @@ class ApplicationEntityReader extends GenericEntityReader {
createFilterForConfsOrMetricsToRetrieve( createFilterForConfsOrMetricsToRetrieve(
dataToRetrieve.getConfsToRetrieve(), dataToRetrieve.getConfsToRetrieve(),
ApplicationColumnFamily.CONFIGS, ApplicationColumnPrefix.CONFIG)); ApplicationColumnFamily.CONFIGS, ApplicationColumnPrefix.CONFIG));
cfsInFields.add(
Bytes.toString(ApplicationColumnFamily.CONFIGS.getBytes()));
} }
// Please note that if metricsToRetrieve is specified, we would have added // Please note that if metricsToRetrieve is specified, we would have added
@ -278,11 +282,14 @@ class ApplicationEntityReader extends GenericEntityReader {
createFilterForConfsOrMetricsToRetrieve( createFilterForConfsOrMetricsToRetrieve(
dataToRetrieve.getMetricsToRetrieve(), dataToRetrieve.getMetricsToRetrieve(),
ApplicationColumnFamily.METRICS, ApplicationColumnPrefix.METRIC)); ApplicationColumnFamily.METRICS, ApplicationColumnPrefix.METRIC));
cfsInFields.add(
Bytes.toString(ApplicationColumnFamily.METRICS.getBytes()));
} }
} }
@Override @Override
protected FilterList constructFilterListBasedOnFields() throws IOException { protected FilterList constructFilterListBasedOnFields(Set<String> cfsInFields)
throws IOException {
if (!needCreateFilterListBasedOnFields()) { if (!needCreateFilterListBasedOnFields()) {
// Fetch all the columns. No need of a filter. // Fetch all the columns. No need of a filter.
return null; return null;
@ -303,8 +310,9 @@ class ApplicationEntityReader extends GenericEntityReader {
excludeFieldsFromInfoColFamily(infoColFamilyList); excludeFieldsFromInfoColFamily(infoColFamilyList);
} }
listBasedOnFields.addFilter(infoColFamilyList); listBasedOnFields.addFilter(infoColFamilyList);
cfsInFields.add(Bytes.toString(ApplicationColumnFamily.INFO.getBytes()));
updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields, cfsInFields);
return listBasedOnFields; return listBasedOnFields;
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
@ -97,7 +98,8 @@ class FlowActivityEntityReader extends TimelineEntityReader {
} }
@Override @Override
protected FilterList constructFilterListBasedOnFields() { protected FilterList constructFilterListBasedOnFields(
Set<String> cfsInFields) {
return null; return null;
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
import java.io.IOException; import java.io.IOException;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
@ -152,7 +154,8 @@ class FlowRunEntityReader extends TimelineEntityReader {
} }
@Override @Override
protected FilterList constructFilterListBasedOnFields() throws IOException { protected FilterList constructFilterListBasedOnFields(
Set<String> cfsInFields) throws IOException {
FilterList list = new FilterList(Operator.MUST_PASS_ONE); FilterList list = new FilterList(Operator.MUST_PASS_ONE);
// By default fetch everything in INFO column family. // By default fetch everything in INFO column family.
FamilyFilter infoColumnFamily = FamilyFilter infoColumnFamily =
@ -166,6 +169,7 @@ class FlowRunEntityReader extends TimelineEntityReader {
&& !hasField(dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) { && !hasField(dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) {
FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE);
infoColFamilyList.addFilter(infoColumnFamily); infoColFamilyList.addFilter(infoColumnFamily);
cfsInFields.add(Bytes.toString(FlowRunColumnFamily.INFO.getBytes()));
infoColFamilyList.addFilter(new QualifierFilter(CompareOp.NOT_EQUAL, infoColFamilyList.addFilter(new QualifierFilter(CompareOp.NOT_EQUAL,
new BinaryPrefixComparator(FlowRunColumnPrefix.METRIC new BinaryPrefixComparator(FlowRunColumnPrefix.METRIC
.getColumnPrefixBytes("")))); .getColumnPrefixBytes(""))));
@ -182,6 +186,7 @@ class FlowRunEntityReader extends TimelineEntityReader {
&& !metricsToRetrieve.getFilterList().isEmpty()) { && !metricsToRetrieve.getFilterList().isEmpty()) {
FilterList infoColFamilyList = new FilterList(); FilterList infoColFamilyList = new FilterList();
infoColFamilyList.addFilter(infoColumnFamily); infoColFamilyList.addFilter(infoColumnFamily);
cfsInFields.add(Bytes.toString(FlowRunColumnFamily.INFO.getBytes()));
FilterList columnsList = updateFixedColumns(); FilterList columnsList = updateFixedColumns();
columnsList.addFilter(TimelineFilterUtils.createHBaseFilterList( columnsList.addFilter(TimelineFilterUtils.createHBaseFilterList(
FlowRunColumnPrefix.METRIC, metricsToRetrieve)); FlowRunColumnPrefix.METRIC, metricsToRetrieve));

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
@ -349,7 +350,8 @@ class GenericEntityReader extends TimelineEntityReader {
* @throws IOException if any problem occurs while updating filter list. * @throws IOException if any problem occurs while updating filter list.
*/ */
private void updateFilterForConfsAndMetricsToRetrieve( private void updateFilterForConfsAndMetricsToRetrieve(
FilterList listBasedOnFields) throws IOException { FilterList listBasedOnFields, Set<String> cfsInFields)
throws IOException {
TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
// Please note that if confsToRetrieve is specified, we would have added // Please note that if confsToRetrieve is specified, we would have added
// CONFS to fields to retrieve in augmentParams() even if not specified. // CONFS to fields to retrieve in augmentParams() even if not specified.
@ -359,6 +361,7 @@ class GenericEntityReader extends TimelineEntityReader {
.createFilterForConfsOrMetricsToRetrieve( .createFilterForConfsOrMetricsToRetrieve(
dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS, dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS,
EntityColumnPrefix.CONFIG)); EntityColumnPrefix.CONFIG));
cfsInFields.add(Bytes.toString(EntityColumnFamily.CONFIGS.getBytes()));
} }
// Please note that if metricsToRetrieve is specified, we would have added // Please note that if metricsToRetrieve is specified, we would have added
@ -369,11 +372,13 @@ class GenericEntityReader extends TimelineEntityReader {
.createFilterForConfsOrMetricsToRetrieve( .createFilterForConfsOrMetricsToRetrieve(
dataToRetrieve.getMetricsToRetrieve(), dataToRetrieve.getMetricsToRetrieve(),
EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC)); EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC));
cfsInFields.add(Bytes.toString(EntityColumnFamily.METRICS.getBytes()));
} }
} }
@Override @Override
protected FilterList constructFilterListBasedOnFields() throws IOException { protected FilterList constructFilterListBasedOnFields(Set<String> cfsInFields)
throws IOException {
if (!needCreateFilterListBasedOnFields()) { if (!needCreateFilterListBasedOnFields()) {
// Fetch all the columns. No need of a filter. // Fetch all the columns. No need of a filter.
return null; return null;
@ -394,7 +399,8 @@ class GenericEntityReader extends TimelineEntityReader {
excludeFieldsFromInfoColFamily(infoColFamilyList); excludeFieldsFromInfoColFamily(infoColFamilyList);
} }
listBasedOnFields.addFilter(infoColFamilyList); listBasedOnFields.addFilter(infoColFamilyList);
updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); cfsInFields.add(Bytes.toString(EntityColumnFamily.INFO.getBytes()));
updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields, cfsInFields);
return listBasedOnFields; return listBasedOnFields;
} }

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
@ -247,7 +248,8 @@ class SubApplicationEntityReader extends GenericEntityReader {
* @throws IOException if any problem occurs while updating filter list. * @throws IOException if any problem occurs while updating filter list.
*/ */
private void updateFilterForConfsAndMetricsToRetrieve( private void updateFilterForConfsAndMetricsToRetrieve(
FilterList listBasedOnFields) throws IOException { FilterList listBasedOnFields, Set<String> cfsInFields)
throws IOException {
TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
// Please note that if confsToRetrieve is specified, we would have added // Please note that if confsToRetrieve is specified, we would have added
// CONFS to fields to retrieve in augmentParams() even if not specified. // CONFS to fields to retrieve in augmentParams() even if not specified.
@ -258,6 +260,8 @@ class SubApplicationEntityReader extends GenericEntityReader {
dataToRetrieve.getConfsToRetrieve(), dataToRetrieve.getConfsToRetrieve(),
SubApplicationColumnFamily.CONFIGS, SubApplicationColumnFamily.CONFIGS,
SubApplicationColumnPrefix.CONFIG)); SubApplicationColumnPrefix.CONFIG));
cfsInFields.add(
Bytes.toString(SubApplicationColumnFamily.CONFIGS.getBytes()));
} }
// Please note that if metricsToRetrieve is specified, we would have added // Please note that if metricsToRetrieve is specified, we would have added
@ -269,11 +273,14 @@ class SubApplicationEntityReader extends GenericEntityReader {
dataToRetrieve.getMetricsToRetrieve(), dataToRetrieve.getMetricsToRetrieve(),
SubApplicationColumnFamily.METRICS, SubApplicationColumnFamily.METRICS,
SubApplicationColumnPrefix.METRIC)); SubApplicationColumnPrefix.METRIC));
cfsInFields.add(
Bytes.toString(SubApplicationColumnFamily.METRICS.getBytes()));
} }
} }
@Override @Override
protected FilterList constructFilterListBasedOnFields() throws IOException { protected FilterList constructFilterListBasedOnFields(Set<String> cfsInFields)
throws IOException {
if (!needCreateFilterListBasedOnFields()) { if (!needCreateFilterListBasedOnFields()) {
// Fetch all the columns. No need of a filter. // Fetch all the columns. No need of a filter.
return null; return null;
@ -293,7 +300,9 @@ class SubApplicationEntityReader extends GenericEntityReader {
excludeFieldsFromInfoColFamily(infoColFamilyList); excludeFieldsFromInfoColFamily(infoColFamilyList);
} }
listBasedOnFields.addFilter(infoColFamilyList); listBasedOnFields.addFilter(infoColFamilyList);
updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); cfsInFields.add(
Bytes.toString(SubApplicationColumnFamily.INFO.getBytes()));
updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields, cfsInFields);
return listBasedOnFields; return listBasedOnFields;
} }

View File

@ -30,11 +30,16 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
@ -122,11 +127,12 @@ public abstract class TimelineEntityReader extends
* results fetched from HBase back-end storage. This is called only for * results fetched from HBase back-end storage. This is called only for
* multiple entity reads. * multiple entity reads.
* *
* @param cfsInFields column families in the fields
* @return a {@link FilterList} object. * @return a {@link FilterList} object.
* @throws IOException if any problem occurs while creating filter list. * @throws IOException if any problem occurs while creating filter list.
*/ */
protected abstract FilterList constructFilterListBasedOnFields() protected abstract FilterList constructFilterListBasedOnFields(
throws IOException; Set<String> cfsInFields) throws IOException;
/** /**
* Creates a {@link FilterList} based on info, config and metric filters. This * Creates a {@link FilterList} based on info, config and metric filters. This
@ -151,7 +157,9 @@ public abstract class TimelineEntityReader extends
FilterList listBasedOnFilters = constructFilterListBasedOnFilters(); FilterList listBasedOnFilters = constructFilterListBasedOnFilters();
boolean hasListBasedOnFilters = listBasedOnFilters != null && boolean hasListBasedOnFilters = listBasedOnFilters != null &&
!listBasedOnFilters.getFilters().isEmpty(); !listBasedOnFilters.getFilters().isEmpty();
FilterList listBasedOnFields = constructFilterListBasedOnFields(); Set<String> cfsInListBasedOnFields = new HashSet<>(0);
FilterList listBasedOnFields =
constructFilterListBasedOnFields(cfsInListBasedOnFields);
boolean hasListBasedOnFields = listBasedOnFields != null && boolean hasListBasedOnFields = listBasedOnFields != null &&
!listBasedOnFields.getFilters().isEmpty(); !listBasedOnFields.getFilters().isEmpty();
// If filter lists based on both filters and fields can be created, // If filter lists based on both filters and fields can be created,
@ -164,6 +172,21 @@ public abstract class TimelineEntityReader extends
if (hasListBasedOnFilters && hasListBasedOnFields) { if (hasListBasedOnFilters && hasListBasedOnFields) {
FilterList list = new FilterList(); FilterList list = new FilterList();
list.addFilter(listBasedOnFilters); list.addFilter(listBasedOnFilters);
Set<String> cfsInListBasedOnFilters = new HashSet<>(0);
extractColumnFamiliesFromFiltersBasedOnFilters(
listBasedOnFilters, cfsInListBasedOnFilters);
// must exclude cfs that are already covered in fields-based filters
// otherwise we will return the whole cf
cfsInListBasedOnFilters.removeAll(cfsInListBasedOnFields);
if (!cfsInListBasedOnFilters.isEmpty()) {
for (String cf: cfsInListBasedOnFilters) {
listBasedOnFields.addFilter(new FamilyFilter(CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes(cf))));
}
}
list.addFilter(listBasedOnFields); list.addFilter(listBasedOnFields);
return list; return list;
} else if (hasListBasedOnFilters) { } else if (hasListBasedOnFilters) {
@ -174,6 +197,21 @@ public abstract class TimelineEntityReader extends
return null; return null;
} }
private static void extractColumnFamiliesFromFiltersBasedOnFilters(
Filter hbaseFilterBasedOnTLSFilter, Set<String> columnFamilies) {
if (hbaseFilterBasedOnTLSFilter instanceof SingleColumnValueFilter) {
byte[] cf = ((SingleColumnValueFilter)
hbaseFilterBasedOnTLSFilter).getFamily();
columnFamilies.add(Bytes.toString(cf));
} else if (hbaseFilterBasedOnTLSFilter instanceof FilterList) {
FilterList filterListBase = (FilterList) hbaseFilterBasedOnTLSFilter;
for (Filter fs: filterListBase.getFilters()) {
extractColumnFamiliesFromFiltersBasedOnFilters(fs, columnFamilies);
}
}
}
protected TimelineDataToRetrieve getDataToRetrieve() { protected TimelineDataToRetrieve getDataToRetrieve() {
return dataToRetrieve; return dataToRetrieve;
} }
@ -206,7 +244,7 @@ public abstract class TimelineEntityReader extends
validateParams(); validateParams();
augmentParams(hbaseConf, conn); augmentParams(hbaseConf, conn);
FilterList filterList = constructFilterListBasedOnFields(); FilterList filterList = constructFilterListBasedOnFields(new HashSet<>(0));
if (LOG.isDebugEnabled() && filterList != null) { if (LOG.isDebugEnabled() && filterList != null) {
LOG.debug("FilterList created for get is - " + filterList); LOG.debug("FilterList created for get is - " + filterList);
} }