YARN-3862. Support for fetching specific configs and metrics based on prefixes (Varun Saxena via sjlee)

This commit is contained in:
Sangjin Lee 2015-12-01 21:47:43 -08:00
parent ae72f1dc77
commit 1f710484e5
26 changed files with 1758 additions and 149 deletions

View File

@ -77,7 +77,7 @@ public class TimelineReaderManager extends AbstractService {
return reader.getEntities(userId, cluster, flowId, flowRunId, appId,
entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
metricFilters, eventFilters, fieldsToRetrieve);
metricFilters, eventFilters, null, null, fieldsToRetrieve);
}
/**
@ -91,6 +91,6 @@ public class TimelineReaderManager extends AbstractService {
String entityId, EnumSet<Field> fields) throws IOException {
String cluster = getClusterID(clusterId, getConfig());
return reader.getEntity(userId, cluster, flowId, flowRunId, appId,
entityType, entityId, fields);
entityType, entityId, null, null, fields);
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Filter class which represents filter to be applied based on key-value pair
* and the relation between them represented by different relational operators.
*/
@Private
@Unstable
public class TimelineCompareFilter extends TimelineFilter {
private TimelineCompareOp compareOp;
private String key;
private Object value;
public TimelineCompareFilter() {
}
public TimelineCompareFilter(TimelineCompareOp op, String key, Object val) {
this.compareOp = op;
this.key = key;
this.value = val;
}
@Override
public TimelineFilterType getFilterType() {
return TimelineFilterType.COMPARE;
}
public TimelineCompareOp getCompareOp() {
return compareOp;
}
public String getKey() {
return key;
}
public Object getValue() {
return value;
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Comparison Operators.
*/
@Private
@Unstable
public enum TimelineCompareOp {
LESS_THAN,
LESS_OR_EQUAL,
EQUAL,
NOT_EQUAL,
GREATER_OR_EQUAL,
GREATER_THAN
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Abstract base class extended to implement timeline filters.
*/
@Private
@Unstable
public abstract class TimelineFilter {
/**
* Lists the different filter types.
*/
@Private
@Unstable
public enum TimelineFilterType {
/**
* Combines multiple filters.
*/
LIST,
/**
* Filter which is used for comparison.
*/
COMPARE,
/**
* Filter which matches prefix for a config or a metric.
*/
PREFIX
}
public abstract TimelineFilterType getFilterType();
public String toString() {
return this.getClass().getSimpleName();
}
}

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Implementation of {@link TimelineFilter} that represents an ordered list of
* timeline filters which will then be evaluated with a specified boolean
* operator {@link Operator#AND} or {@link Operator#OR}. Since you can use
* timeline filter lists as children of timeline filter lists, you can create a
* hierarchy of filters to be evaluated.
*/
@Private
@Unstable
public class TimelineFilterList extends TimelineFilter {
/**
* Specifies how filters in the filter list will be evaluated. AND means all
* the filters should match and OR means atleast one should match.
*/
@Private
@Unstable
public static enum Operator {
AND,
OR
}
private Operator operator;
private List<TimelineFilter> filterList = new ArrayList<TimelineFilter>();
public TimelineFilterList(TimelineFilter...filters) {
this(Operator.AND, filters);
}
public TimelineFilterList(Operator op, TimelineFilter...filters) {
this.operator = op;
this.filterList = new ArrayList<TimelineFilter>(Arrays.asList(filters));
}
@Override
public TimelineFilterType getFilterType() {
return TimelineFilterType.LIST;
}
/**
* Get the filter list.
*
* @return filterList
*/
public List<TimelineFilter> getFilterList() {
return filterList;
}
/**
* Get the operator.
*
* @return operator
*/
public Operator getOperator() {
return operator;
}
public void setOperator(Operator op) {
operator = op;
}
public void addFilter(TimelineFilter filter) {
filterList.add(filter);
}
}

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.hbase.filter.QualifierFilter;
/**
* Set of utility methods used by timeline filter classes.
*/
public final class TimelineFilterUtils {
private TimelineFilterUtils() {
}
/**
* Returns the equivalent HBase filter list's {@link Operator}.
* @param op
* @return HBase filter list's Operator.
*/
private static Operator getHBaseOperator(TimelineFilterList.Operator op) {
switch (op) {
case AND:
return Operator.MUST_PASS_ALL;
case OR:
return Operator.MUST_PASS_ONE;
default:
throw new IllegalArgumentException("Invalid operator");
}
}
/**
* Returns the equivalent HBase compare filter's {@link CompareOp}.
* @param op
* @return HBase compare filter's CompareOp.
*/
private static CompareOp getHBaseCompareOp(
TimelineCompareOp op) {
switch (op) {
case LESS_THAN:
return CompareOp.LESS;
case LESS_OR_EQUAL:
return CompareOp.LESS_OR_EQUAL;
case EQUAL:
return CompareOp.EQUAL;
case NOT_EQUAL:
return CompareOp.NOT_EQUAL;
case GREATER_OR_EQUAL:
return CompareOp.GREATER_OR_EQUAL;
case GREATER_THAN:
return CompareOp.GREATER;
default:
throw new IllegalArgumentException("Invalid compare operator");
}
}
/**
* Converts a {@link TimelinePrefixFilter} to an equivalent HBase
* {@link QualifierFilter}.
* @param colPrefix
* @param filter
* @return a {@link QualifierFilter} object
*/
private static <T> Filter createHBaseColQualPrefixFilter(
ColumnPrefix<T> colPrefix, TimelinePrefixFilter filter) {
return new QualifierFilter(getHBaseCompareOp(filter.getCompareOp()),
new BinaryPrefixComparator(
colPrefix.getColumnPrefixBytes(filter.getPrefix())));
}
/**
* Creates equivalent HBase {@link FilterList} from {@link TimelineFilterList}
* while converting different timeline filters(of type {@link TimelineFilter})
* into their equivalent HBase filters.
* @param colPrefix
* @param filterList
* @return a {@link FilterList} object
*/
public static <T> FilterList createHBaseFilterList(ColumnPrefix<T> colPrefix,
TimelineFilterList filterList) {
FilterList list =
new FilterList(getHBaseOperator(filterList.getOperator()));
for (TimelineFilter filter : filterList.getFilterList()) {
switch(filter.getFilterType()) {
case LIST:
list.addFilter(
createHBaseFilterList(colPrefix, (TimelineFilterList)filter));
break;
case PREFIX:
list.addFilter(createHBaseColQualPrefixFilter(
colPrefix, (TimelinePrefixFilter)filter));
break;
default:
break;
}
}
return list;
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Filter class which represents filter to be applied based on prefixes.
* Prefixes can either match or not match.
*/
@Private
@Unstable
public class TimelinePrefixFilter extends TimelineFilter {
private TimelineCompareOp compareOp;
private String prefix;
public TimelinePrefixFilter(TimelineCompareOp op, String prefix) {
this.prefix = prefix;
if (op != TimelineCompareOp.EQUAL && op != TimelineCompareOp.NOT_EQUAL) {
throw new IllegalArgumentException("CompareOp for prefix filter should " +
"be EQUAL or NOT_EQUAL");
}
this.compareOp = op;
}
@Override
public TimelineFilterType getFilterType() {
return TimelineFilterType.PREFIX;
}
public String getPrefix() {
return prefix;
}
public TimelineCompareOp getCompareOp() {
return compareOp;
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.server.timelineservice.reader.filter stores
* timeline filter implementations.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -28,11 +28,21 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
@ -56,18 +66,21 @@ class ApplicationEntityReader extends GenericEntityReader {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
eventFilters, fieldsToRetrieve, true);
eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
true);
}
public ApplicationEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve) {
String entityId, TimelineFilterList confsToRetrieve,
TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
fieldsToRetrieve);
confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
}
/**
@ -78,13 +91,95 @@ class ApplicationEntityReader extends GenericEntityReader {
}
@Override
protected Result getResult(Configuration hbaseConf, Connection conn)
throws IOException {
protected FilterList constructFilterListBasedOnFields() {
FilterList list = new FilterList(Operator.MUST_PASS_ONE);
// Fetch all the columns.
if (fieldsToRetrieve.contains(Field.ALL) &&
(confsToRetrieve == null ||
confsToRetrieve.getFilterList().isEmpty()) &&
(metricsToRetrieve == null ||
metricsToRetrieve.getFilterList().isEmpty())) {
return list;
}
FilterList infoColFamilyList = new FilterList();
// By default fetch everything in INFO column family.
FamilyFilter infoColumnFamily =
new FamilyFilter(CompareOp.EQUAL,
new BinaryComparator(ApplicationColumnFamily.INFO.getBytes()));
infoColFamilyList.addFilter(infoColumnFamily);
// Events not required.
if (!fieldsToRetrieve.contains(Field.EVENTS) &&
!fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) {
infoColFamilyList.addFilter(
new QualifierFilter(CompareOp.NOT_EQUAL,
new BinaryPrefixComparator(
ApplicationColumnPrefix.EVENT.getColumnPrefixBytes(""))));
}
// info not required.
if (!fieldsToRetrieve.contains(Field.INFO) &&
!fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) {
infoColFamilyList.addFilter(
new QualifierFilter(CompareOp.NOT_EQUAL,
new BinaryPrefixComparator(
ApplicationColumnPrefix.INFO.getColumnPrefixBytes(""))));
}
// is releated to not required.
if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) &&
!fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) {
infoColFamilyList.addFilter(
new QualifierFilter(CompareOp.NOT_EQUAL,
new BinaryPrefixComparator(
ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
}
// relates to not required.
if (!fieldsToRetrieve.contains(Field.RELATES_TO) &&
!fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) {
infoColFamilyList.addFilter(
new QualifierFilter(CompareOp.NOT_EQUAL,
new BinaryPrefixComparator(
ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
}
list.addFilter(infoColFamilyList);
if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) ||
(confsToRetrieve != null &&
!confsToRetrieve.getFilterList().isEmpty())) {
FilterList filterCfg =
new FilterList(new FamilyFilter(CompareOp.EQUAL,
new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes())));
if (confsToRetrieve != null &&
!confsToRetrieve.getFilterList().isEmpty()) {
filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
ApplicationColumnPrefix.CONFIG, confsToRetrieve));
}
list.addFilter(filterCfg);
}
if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) ||
(metricsToRetrieve != null &&
!metricsToRetrieve.getFilterList().isEmpty())) {
FilterList filterMetrics =
new FilterList(new FamilyFilter(CompareOp.EQUAL,
new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes())));
if (metricsToRetrieve != null &&
!metricsToRetrieve.getFilterList().isEmpty()) {
filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
ApplicationColumnPrefix.METRIC, metricsToRetrieve));
}
list.addFilter(filterMetrics);
}
return list;
}
@Override
protected Result getResult(Configuration hbaseConf, Connection conn,
FilterList filterList) throws IOException {
byte[] rowKey =
ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId,
appId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
if (filterList != null && !filterList.getFilters().isEmpty()) {
get.setFilter(filterList);
}
return table.getResult(hbaseConf, conn, get);
}
@ -115,6 +210,15 @@ class ApplicationEntityReader extends GenericEntityReader {
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.noneOf(Field.class);
}
if (!fieldsToRetrieve.contains(Field.CONFIGS) &&
confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) {
fieldsToRetrieve.add(Field.CONFIGS);
}
if (!fieldsToRetrieve.contains(Field.METRICS) &&
metricsToRetrieve != null &&
!metricsToRetrieve.getFilterList().isEmpty()) {
fieldsToRetrieve.add(Field.METRICS);
}
if (!singleEntityRead) {
if (limit == null || limit < 0) {
limit = TimelineReader.DEFAULT_LIMIT;
@ -136,7 +240,7 @@ class ApplicationEntityReader extends GenericEntityReader {
@Override
protected ResultScanner getResults(Configuration hbaseConf,
Connection conn) throws IOException {
Connection conn, FilterList filterList) throws IOException {
Scan scan = new Scan();
if (flowRunId != null) {
scan.setRowPrefixFilter(ApplicationRowKey.
@ -145,7 +249,12 @@ class ApplicationEntityReader extends GenericEntityReader {
scan.setRowPrefixFilter(ApplicationRowKey.
getRowKeyPrefix(clusterId, userId, flowId));
}
scan.setFilter(new PageFilter(limit));
FilterList newList = new FilterList();
newList.addFilter(new PageFilter(limit));
if (filterList != null && !filterList.getFilters().isEmpty()) {
newList.addFilter(filterList);
}
scan.setFilter(newList);
return table.getResultScanner(hbaseConf, conn, scan);
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.codehaus.jackson.JsonGenerationException;
@ -272,6 +273,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) throws IOException {
if (limit == null || limit <= 0) {
limit = DEFAULT_LIMIT;
@ -386,7 +388,9 @@ public class FileSystemTimelineReaderImpl extends AbstractService
@Override
public TimelineEntity getEntity(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve) throws IOException {
String entityId, TimelineFilterList confsToRetrieve,
TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve)
throws IOException {
String flowRunPath = getFlowRunPath(userId, clusterId, flowId,
flowRunId, appId);
File dir = new File(new File(rootPath, ENTITIES_DIR),
@ -413,6 +417,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) throws IOException {
String flowRunPath =
getFlowRunPath(userId, clusterId, flowId, flowRunId, appId);
@ -422,6 +427,6 @@ public class FileSystemTimelineReaderImpl extends AbstractService
return getEntities(dir, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
eventFilters, fieldsToRetrieve);
eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
}
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
@ -58,14 +59,14 @@ class FlowActivityEntityReader extends TimelineEntityReader {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
eventFilters, fieldsToRetrieve, true);
eventFilters, null, null, fieldsToRetrieve, true);
}
public FlowActivityEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
fieldsToRetrieve);
null, null, fieldsToRetrieve);
}
/**
@ -96,15 +97,20 @@ class FlowActivityEntityReader extends TimelineEntityReader {
}
@Override
protected Result getResult(Configuration hbaseConf, Connection conn)
throws IOException {
protected FilterList constructFilterListBasedOnFields() {
return null;
}
@Override
protected Result getResult(Configuration hbaseConf, Connection conn,
FilterList filterList) throws IOException {
throw new UnsupportedOperationException(
"we don't support a single entity query");
}
@Override
protected ResultScanner getResults(Configuration hbaseConf,
Connection conn) throws IOException {
Connection conn, FilterList filterList) throws IOException {
Scan scan = new Scan();
if (createdTimeBegin == DEFAULT_BEGIN_TIME &&
createdTimeEnd == DEFAULT_END_TIME) {

View File

@ -28,12 +28,22 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
@ -54,18 +64,20 @@ class FlowRunEntityReader extends TimelineEntityReader {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
eventFilters, fieldsToRetrieve, true);
eventFilters, null, metricsToRetrieve, fieldsToRetrieve, true);
}
public FlowRunEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve) {
String entityId, TimelineFilterList confsToRetrieve,
TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
fieldsToRetrieve);
null, metricsToRetrieve, fieldsToRetrieve);
}
/**
@ -101,26 +113,69 @@ class FlowRunEntityReader extends TimelineEntityReader {
if (createdTimeEnd == null) {
createdTimeEnd = DEFAULT_END_TIME;
}
if (!fieldsToRetrieve.contains(Field.METRICS) &&
metricsToRetrieve != null &&
!metricsToRetrieve.getFilterList().isEmpty()) {
fieldsToRetrieve.add(Field.METRICS);
}
}
}
@Override
protected Result getResult(Configuration hbaseConf, Connection conn)
throws IOException {
protected FilterList constructFilterListBasedOnFields() {
FilterList list = new FilterList(Operator.MUST_PASS_ONE);
// By default fetch everything in INFO column family.
FamilyFilter infoColumnFamily =
new FamilyFilter(CompareOp.EQUAL,
new BinaryComparator(FlowRunColumnFamily.INFO.getBytes()));
// Metrics not required.
if (!singleEntityRead && !fieldsToRetrieve.contains(Field.METRICS) &&
!fieldsToRetrieve.contains(Field.ALL)) {
FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE);
infoColFamilyList.addFilter(infoColumnFamily);
infoColFamilyList.addFilter(
new QualifierFilter(CompareOp.NOT_EQUAL,
new BinaryPrefixComparator(
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(""))));
list.addFilter(infoColFamilyList);
}
if (metricsToRetrieve != null &&
!metricsToRetrieve.getFilterList().isEmpty()) {
FilterList infoColFamilyList = new FilterList();
infoColFamilyList.addFilter(infoColumnFamily);
infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList(
FlowRunColumnPrefix.METRIC, metricsToRetrieve));
list.addFilter(infoColFamilyList);
}
return list;
}
@Override
protected Result getResult(Configuration hbaseConf, Connection conn,
FilterList filterList) throws IOException {
byte[] rowKey =
FlowRunRowKey.getRowKey(clusterId, userId, flowId, flowRunId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
if (filterList != null && !filterList.getFilters().isEmpty()) {
get.setFilter(filterList);
}
return table.getResult(hbaseConf, conn, get);
}
@Override
protected ResultScanner getResults(Configuration hbaseConf,
Connection conn) throws IOException {
Connection conn, FilterList filterList) throws IOException {
Scan scan = new Scan();
scan.setRowPrefixFilter(
FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowId));
scan.setFilter(new PageFilter(limit));
FilterList newList = new FilterList();
newList.addFilter(new PageFilter(limit));
if (filterList != null && !filterList.getFilters().isEmpty()) {
newList.addFilter(filterList);
}
scan.setFilter(newList);
return table.getResultScanner(hbaseConf, conn, scan);
}

View File

@ -32,9 +32,18 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
@ -46,6 +55,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
@ -72,18 +82,21 @@ class GenericEntityReader extends TimelineEntityReader {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
eventFilters, fieldsToRetrieve, sortedKeys);
eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
sortedKeys);
}
public GenericEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve) {
String entityId, TimelineFilterList confsToRetrieve,
TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
fieldsToRetrieve);
confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
}
/**
@ -93,6 +106,85 @@ class GenericEntityReader extends TimelineEntityReader {
return ENTITY_TABLE;
}
@Override
protected FilterList constructFilterListBasedOnFields() {
FilterList list = new FilterList(Operator.MUST_PASS_ONE);
// Fetch all the columns.
if (fieldsToRetrieve.contains(Field.ALL) &&
(confsToRetrieve == null ||
confsToRetrieve.getFilterList().isEmpty()) &&
(metricsToRetrieve == null ||
metricsToRetrieve.getFilterList().isEmpty())) {
return list;
}
FilterList infoColFamilyList = new FilterList();
// By default fetch everything in INFO column family.
FamilyFilter infoColumnFamily =
new FamilyFilter(CompareOp.EQUAL,
new BinaryComparator(EntityColumnFamily.INFO.getBytes()));
infoColFamilyList.addFilter(infoColumnFamily);
// Events not required.
if (!fieldsToRetrieve.contains(Field.EVENTS) &&
!fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) {
infoColFamilyList.addFilter(
new QualifierFilter(CompareOp.NOT_EQUAL,
new BinaryPrefixComparator(
EntityColumnPrefix.EVENT.getColumnPrefixBytes(""))));
}
// info not required.
if (!fieldsToRetrieve.contains(Field.INFO) &&
!fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) {
infoColFamilyList.addFilter(
new QualifierFilter(CompareOp.NOT_EQUAL,
new BinaryPrefixComparator(
EntityColumnPrefix.INFO.getColumnPrefixBytes(""))));
}
// is related to not required.
if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) &&
!fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) {
infoColFamilyList.addFilter(
new QualifierFilter(CompareOp.NOT_EQUAL,
new BinaryPrefixComparator(
EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
}
// relates to not required.
if (!fieldsToRetrieve.contains(Field.RELATES_TO) &&
!fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) {
infoColFamilyList.addFilter(
new QualifierFilter(CompareOp.NOT_EQUAL,
new BinaryPrefixComparator(
EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
}
list.addFilter(infoColFamilyList);
if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) ||
(confsToRetrieve != null &&
!confsToRetrieve.getFilterList().isEmpty())) {
FilterList filterCfg =
new FilterList(new FamilyFilter(CompareOp.EQUAL,
new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes())));
if (confsToRetrieve != null &&
!confsToRetrieve.getFilterList().isEmpty()) {
filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
EntityColumnPrefix.CONFIG, confsToRetrieve));
}
list.addFilter(filterCfg);
}
if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) ||
(metricsToRetrieve != null &&
!metricsToRetrieve.getFilterList().isEmpty())) {
FilterList filterMetrics =
new FilterList(new FamilyFilter(CompareOp.EQUAL,
new BinaryComparator(EntityColumnFamily.METRICS.getBytes())));
if (metricsToRetrieve != null &&
!metricsToRetrieve.getFilterList().isEmpty()) {
filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
EntityColumnPrefix.METRIC, metricsToRetrieve));
}
list.addFilter(filterMetrics);
}
return list;
}
protected FlowContext lookupFlowContext(String clusterId, String appId,
Configuration hbaseConf, Connection conn) throws IOException {
byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
@ -145,6 +237,15 @@ class GenericEntityReader extends TimelineEntityReader {
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.noneOf(Field.class);
}
if (!fieldsToRetrieve.contains(Field.CONFIGS) &&
confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) {
fieldsToRetrieve.add(Field.CONFIGS);
}
if (!fieldsToRetrieve.contains(Field.METRICS) &&
metricsToRetrieve != null &&
!metricsToRetrieve.getFilterList().isEmpty()) {
fieldsToRetrieve.add(Field.METRICS);
}
if (!singleEntityRead) {
if (limit == null || limit < 0) {
limit = TimelineReader.DEFAULT_LIMIT;
@ -165,25 +266,31 @@ class GenericEntityReader extends TimelineEntityReader {
}
@Override
protected Result getResult(Configuration hbaseConf, Connection conn)
throws IOException {
protected Result getResult(Configuration hbaseConf, Connection conn,
FilterList filterList) throws IOException {
byte[] rowKey =
EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId,
entityType, entityId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
if (filterList != null && !filterList.getFilters().isEmpty()) {
get.setFilter(filterList);
}
return table.getResult(hbaseConf, conn, get);
}
@Override
protected ResultScanner getResults(Configuration hbaseConf,
Connection conn) throws IOException {
Connection conn, FilterList filterList) throws IOException {
// Scan through part of the table to find the entities belong to one app
// and one type
Scan scan = new Scan();
scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
clusterId, userId, flowId, flowRunId, appId, entityType));
scan.setMaxVersions(Integer.MAX_VALUE);
if (filterList != null && !filterList.getFilters().isEmpty()) {
scan.setFilter(filterList);
}
return table.getResultScanner(hbaseConf, conn, scan);
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
public class HBaseTimelineReaderImpl
extends AbstractService implements TimelineReader {
@ -64,11 +65,13 @@ public class HBaseTimelineReaderImpl
@Override
public TimelineEntity getEntity(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve)
String entityId, TimelineFilterList confsToRetrieve,
TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve)
throws IOException {
TimelineEntityReader reader =
TimelineEntityReaderFactory.createSingleEntityReader(userId, clusterId,
flowId, flowRunId, appId, entityType, entityId, fieldsToRetrieve);
flowId, flowRunId, appId, entityType, entityId, confsToRetrieve,
metricsToRetrieve, fieldsToRetrieve);
return reader.readEntity(hbaseConf, conn);
}
@ -80,13 +83,15 @@ public class HBaseTimelineReaderImpl
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) throws IOException {
TimelineEntityReader reader =
TimelineEntityReaderFactory.createMultipleEntitiesReader(userId,
clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
metricFilters, eventFilters, fieldsToRetrieve);
metricFilters, eventFilters, confsToRetrieve, metricsToRetrieve,
fieldsToRetrieve);
return reader.readEntities(hbaseConf, conn);
}
}

View File

@ -31,8 +31,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
@ -70,6 +72,8 @@ abstract class TimelineEntityReader {
protected Map<String, String> configFilters;
protected Set<String> metricFilters;
protected Set<String> eventFilters;
protected TimelineFilterList confsToRetrieve;
protected TimelineFilterList metricsToRetrieve;
/**
* Main table the entity reader uses.
@ -94,6 +98,7 @@ abstract class TimelineEntityReader {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
this.singleEntityRead = false;
this.sortedKeys = sortedKeys;
@ -115,6 +120,8 @@ abstract class TimelineEntityReader {
this.configFilters = configFilters;
this.metricFilters = metricFilters;
this.eventFilters = eventFilters;
this.confsToRetrieve = confsToRetrieve;
this.metricsToRetrieve = metricsToRetrieve;
this.table = getTable();
}
@ -124,7 +131,8 @@ abstract class TimelineEntityReader {
*/
protected TimelineEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve) {
String entityId, TimelineFilterList confsToRetrieve,
TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
this.singleEntityRead = true;
this.userId = userId;
this.clusterId = clusterId;
@ -134,10 +142,20 @@ abstract class TimelineEntityReader {
this.entityType = entityType;
this.fieldsToRetrieve = fieldsToRetrieve;
this.entityId = entityId;
this.confsToRetrieve = confsToRetrieve;
this.metricsToRetrieve = metricsToRetrieve;
this.table = getTable();
}
/**
* Creates a {@link FilterList} based on fields, confs and metrics to
* retrieve. This filter list will be set in Scan/Get objects to trim down
* results fetched from HBase back-end storage.
* @return a {@link FilterList} object.
*/
protected abstract FilterList constructFilterListBasedOnFields();
/**
* Reads and deserializes a single timeline entity from the HBase storage.
*/
@ -146,7 +164,8 @@ abstract class TimelineEntityReader {
validateParams();
augmentParams(hbaseConf, conn);
Result result = getResult(hbaseConf, conn);
FilterList filterList = constructFilterListBasedOnFields();
Result result = getResult(hbaseConf, conn, filterList);
if (result == null || result.isEmpty()) {
// Could not find a matching row.
LOG.info("Cannot find matching entity of type " + entityType);
@ -166,7 +185,8 @@ abstract class TimelineEntityReader {
augmentParams(hbaseConf, conn);
NavigableSet<TimelineEntity> entities = new TreeSet<>();
ResultScanner results = getResults(hbaseConf, conn);
FilterList filterList = constructFilterListBasedOnFields();
ResultScanner results = getResults(hbaseConf, conn, filterList);
try {
for (Result result : results) {
TimelineEntity entity = parseEntity(result);
@ -211,14 +231,14 @@ abstract class TimelineEntityReader {
*
* @return the {@link Result} instance or null if no such record is found.
*/
protected abstract Result getResult(Configuration hbaseConf, Connection conn)
throws IOException;
protected abstract Result getResult(Configuration hbaseConf, Connection conn,
FilterList filterList) throws IOException;
/**
* Fetches a {@link ResultScanner} for a multi-entity read.
*/
protected abstract ResultScanner getResults(Configuration hbaseConf,
Connection conn) throws IOException;
Connection conn, FilterList filterList) throws IOException;
/**
* Given a {@link Result} instance, deserializes and creates a

View File

@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
/**
@ -34,22 +35,23 @@ class TimelineEntityReaderFactory {
*/
public static TimelineEntityReader createSingleEntityReader(String userId,
String clusterId, String flowId, Long flowRunId, String appId,
String entityType, String entityId, EnumSet<Field> fieldsToRetrieve) {
String entityType, String entityId, TimelineFilterList confs,
TimelineFilterList metrics, EnumSet<Field> fieldsToRetrieve) {
// currently the types that are handled separate from the generic entity
// table are application, flow run, and flow activity entities
if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, entityId, fieldsToRetrieve);
appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
} else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, entityId, fieldsToRetrieve);
appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
} else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, entityId, fieldsToRetrieve);
} else {
// assume we're dealing with a generic entity read
return new GenericEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, entityId, fieldsToRetrieve);
appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
}
}
@ -64,6 +66,7 @@ class TimelineEntityReaderFactory {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
TimelineFilterList confs, TimelineFilterList metrics,
EnumSet<Field> fieldsToRetrieve) {
// currently the types that are handled separate from the generic entity
// table are application, flow run, and flow activity entities
@ -71,8 +74,8 @@ class TimelineEntityReaderFactory {
return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
infoFilters, configFilters, metricFilters, eventFilters,
fieldsToRetrieve);
infoFilters, configFilters, metricFilters, eventFilters, confs,
metrics, fieldsToRetrieve);
} else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
@ -83,15 +86,15 @@ class TimelineEntityReaderFactory {
return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
infoFilters, configFilters, metricFilters, eventFilters,
fieldsToRetrieve);
infoFilters, configFilters, metricFilters, eventFilters, confs,
metrics, fieldsToRetrieve);
} else {
// assume we're dealing with a generic entity read
return new GenericEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
infoFilters, configFilters, metricFilters, eventFilters,
fieldsToRetrieve, false);
infoFilters, configFilters, metricFilters, eventFilters, confs,
metrics, fieldsToRetrieve, false);
}
}
}

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
/** ATSv2 reader interface. */
@Private
@ -70,6 +72,18 @@ public interface TimelineReader extends Service {
* Entity type (mandatory)
* @param entityId
* Entity Id (mandatory)
* @param confsToRetrieve
* Used for deciding which configs to return in response. This is
* represented as a {@link TimelineFilterList} object containing
* {@link TimelinePrefixFilter} objects. These can either be exact config
* keys' or prefixes which are then compared against config keys' to decide
* configs to return in response.
* @param metricsToRetrieve
* Used for deciding which metrics to return in response. This is
* represented as a {@link TimelineFilterList} object containing
* {@link TimelinePrefixFilter} objects. These can either be exact metric
* ids' or prefixes which are then compared against metric ids' to decide
* metrics to return in response.
* @param fieldsToRetrieve
* Specifies which fields of the entity object to retrieve(optional), see
* {@link Field}. If null, retrieves 4 fields namely entity id,
@ -81,6 +95,7 @@ public interface TimelineReader extends Service {
*/
TimelineEntity getEntity(String userId, String clusterId, String flowId,
Long flowRunId, String appId, String entityType, String entityId,
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) throws IOException;
/**
@ -139,6 +154,22 @@ public interface TimelineReader extends Service {
* @param eventFilters
* Matched entities should contain the given events (optional). If null
* or empty, the filter is not applied.
* @param confsToRetrieve
* Used for deciding which configs to return in response. This is
* represented as a {@link TimelineFilterList} object containing
* {@link TimelinePrefixFilter} objects. These can either be exact config
* keys' or prefixes which are then compared against config keys' to decide
* configs(inside entities) to return in response. This should not be
* confused with configFilters which is used to decide which entities to
* return instead.
* @param metricsToRetrieve
* Used for deciding which metrics to return in response. This is
* represented as a {@link TimelineFilterList} object containing
* {@link TimelinePrefixFilter} objects. These can either be exact metric
* ids' or prefixes which are then compared against metric ids' to decide
* metrics(inside entities) to return in response. This should not be
* confused with metricFilters which is used to decide which entities to
* return instead.
* @param fieldsToRetrieve
* Specifies which fields of the entity object to retrieve(optional), see
* {@link Field}. If null, retrieves 4 fields namely entity id,
@ -158,5 +189,6 @@ public interface TimelineReader extends Service {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) throws IOException;
}

View File

@ -119,6 +119,18 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
return columnPrefix;
}
@Override
public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
return ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifierPrefix);
}
@Override
public byte[] getColumnPrefixBytes(String qualifierPrefix) {
return ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifierPrefix);
}
/*
* (non-Javadoc)
*
@ -139,8 +151,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);
@ -166,8 +177,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);

View File

@ -44,13 +44,13 @@ public interface ColumnPrefix<T> {
* @param qualifier column qualifier. Nothing gets written when null.
* @param timestamp version timestamp. When null the server timestamp will be
* used.
*@param attributes attributes for the mutation that are used by the coprocessor
* to set/read the cell tags
* @param attributes attributes for the mutation that are used by the
* coprocessor to set/read the cell tags.
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @throws IOException
*/
public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
byte[] qualifier, Long timestamp, Object inputValue,
Attribute... attributes) throws IOException;
@ -65,13 +65,13 @@ public interface ColumnPrefix<T> {
* @param qualifier column qualifier. Nothing gets written when null.
* @param timestamp version timestamp. When null the server timestamp will be
* used.
*@param attributes attributes for the mutation that are used by the coprocessor
* to set/read the cell tags
* @param attributes attributes for the mutation that are used by the
* coprocessor to set/read the cell tags.
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @throws IOException
*/
public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
String qualifier, Long timestamp, Object inputValue,
Attribute... attributes) throws IOException;
@ -86,7 +86,7 @@ public interface ColumnPrefix<T> {
* in the result.
* @throws IOException
*/
public Object readResult(Result result, String qualifier) throws IOException;
Object readResult(Result result, String qualifier) throws IOException;
/**
* @param result from which to read columns
@ -94,7 +94,7 @@ public interface ColumnPrefix<T> {
* (or all of them if the prefix value is null).
* @throws IOException
*/
public Map<String, Object> readResults(Result result) throws IOException;
Map<String, Object> readResults(Result result) throws IOException;
/**
* @param result from which to reads data with timestamps
@ -104,7 +104,18 @@ public interface ColumnPrefix<T> {
* idB={timestamp3->value3}, idC={timestamp1->value4}}
* @throws IOException
*/
public <V> NavigableMap<String, NavigableMap<Long, V>>
<V> NavigableMap<String, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result) throws IOException;
/**
* @param qualifierPrefix Column qualifier or prefix of qualifier.
* @return a byte array encoding column prefix and qualifier/prefix passed.
*/
byte[] getColumnPrefixBytes(String qualifierPrefix);
/**
* @param qualifierPrefix Column qualifier or prefix of qualifier.
* @return a byte array encoding column prefix and qualifier/prefix passed.
*/
byte[] getColumnPrefixBytes(byte[] qualifierPrefix);
}

View File

@ -119,6 +119,18 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
return columnPrefix;
}
@Override
public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
return ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifierPrefix);
}
@Override
public byte[] getColumnPrefixBytes(String qualifierPrefix) {
return ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifierPrefix);
}
/*
* (non-Javadoc)
*
@ -140,8 +152,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);
@ -167,8 +178,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);

View File

@ -83,6 +83,18 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable>
return columnPrefix;
}
@Override
public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
return ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifierPrefix);
}
@Override
public byte[] getColumnPrefixBytes(String qualifierPrefix) {
return ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifierPrefix);
}
public byte[] getColumnPrefixBytes() {
return columnPrefixBytes.clone();
}
@ -112,8 +124,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable>
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier = ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifier);
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
@ -233,8 +244,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable>
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier = ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifier);
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, null, inputValue,

View File

@ -89,8 +89,16 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
return columnPrefixBytes.clone();
}
public byte[] getColumnPrefixBytes(String qualifier) {
return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
@Override
public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
return ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifierPrefix);
}
@Override
public byte[] getColumnPrefixBytes(String qualifierPrefix) {
return ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifierPrefix);
}
public byte[] getColumnFamilyBytes() {
@ -121,8 +129,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier = ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifier);
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
@ -149,8 +156,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier = ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifier);
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,

View File

@ -266,7 +266,7 @@ public class TestFileSystemTimelineReaderImpl {
// only the id, created and modified time
TimelineEntity result =
reader.getEntity("user1", "cluster1", "flow1", 1L, "app1",
"app", "id_1", null);
"app", "id_1", null, null, null);
Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_1")).toString(),
result.getIdentifier().toString());
@ -281,7 +281,7 @@ public class TestFileSystemTimelineReaderImpl {
// Cluster and AppId should be enough to get an entity.
TimelineEntity result =
reader.getEntity(null, "cluster1", null, null, "app1",
"app", "id_1", null);
"app", "id_1", null, null, null);
Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_1")).toString(),
result.getIdentifier().toString());
@ -298,7 +298,7 @@ public class TestFileSystemTimelineReaderImpl {
// in app flow mapping csv has commas.
TimelineEntity result =
reader.getEntity(null, "cluster1", null, null, "app2",
"app", "id_5", null);
"app", "id_5", null, null, null);
Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_5")).toString(),
result.getIdentifier().toString());
@ -311,7 +311,7 @@ public class TestFileSystemTimelineReaderImpl {
// Specified fields in addition to default view will be returned.
TimelineEntity result =
reader.getEntity("user1", "cluster1", "flow1", 1L,
"app1", "app", "id_1",
"app1", "app", "id_1", null, null,
EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS));
Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_1")).toString(),
@ -329,8 +329,8 @@ public class TestFileSystemTimelineReaderImpl {
public void testGetEntityAllFields() throws Exception {
// All fields of TimelineEntity will be returned.
TimelineEntity result =
reader.getEntity("user1", "cluster1", "flow1", 1L,
"app1", "app", "id_1", EnumSet.of(Field.ALL));
reader.getEntity("user1", "cluster1", "flow1", 1L, "app1", "app",
"id_1", null, null, EnumSet.of(Field.ALL));
Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_1")).toString(),
result.getIdentifier().toString());
@ -347,7 +347,7 @@ public class TestFileSystemTimelineReaderImpl {
Set<TimelineEntity> result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, null, null, null, null, null, null, null,
null, null);
null, null, null, null);
// All 3 entities will be returned
Assert.assertEquals(4, result.size());
}
@ -357,7 +357,7 @@ public class TestFileSystemTimelineReaderImpl {
Set<TimelineEntity> result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
2L, null, null, null, null, null, null, null, null, null,
null, null);
null, null, null, null);
Assert.assertEquals(2, result.size());
// Needs to be rewritten once hashcode and equals for
// TimelineEntity is implemented
@ -371,7 +371,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
3L, null, null, null, null, null, null, null, null, null,
null, null);
null, null, null, null);
// Even though 2 entities out of 4 have same created time, one entity
// is left out due to limit
Assert.assertEquals(3, result.size());
@ -383,7 +383,7 @@ public class TestFileSystemTimelineReaderImpl {
Set<TimelineEntity> result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, 1425016502030L, 1425016502060L, null, null, null, null, null,
null, null, null, null);
null, null, null, null, null, null);
Assert.assertEquals(1, result.size());
// Only one entity with ID id_4 should be returned.
for (TimelineEntity entity : result) {
@ -396,7 +396,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, 1425016502010L, null, null, null, null, null, null,
null, null, null);
null, null, null, null, null);
Assert.assertEquals(3, result.size());
for (TimelineEntity entity : result) {
if (entity.getId().equals("id_4")) {
@ -408,7 +408,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, 1425016502010L, null, null, null, null, null, null, null,
null, null, null);
null, null, null, null, null);
Assert.assertEquals(1, result.size());
for (TimelineEntity entity : result) {
if (!entity.getId().equals("id_4")) {
@ -420,7 +420,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, 1425016502090L, 1425016503020L, null, null, null,
null, null, null, null);
null, null, null, null, null, null);
Assert.assertEquals(2, result.size());
// Two entities with IDs' id_1 and id_4 should be returned.
for (TimelineEntity entity : result) {
@ -433,7 +433,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, null, 1425016502090L, null, null, null, null,
null, null, null);
null, null, null, null, null);
Assert.assertEquals(2, result.size());
for (TimelineEntity entity : result) {
if (entity.getId().equals("id_1") || entity.getId().equals("id_4")) {
@ -445,7 +445,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, 1425016503005L, null, null, null, null, null,
null, null, null);
null, null, null, null, null);
Assert.assertEquals(1, result.size());
for (TimelineEntity entity : result) {
if (!entity.getId().equals("id_4")) {
@ -462,7 +462,7 @@ public class TestFileSystemTimelineReaderImpl {
Set<TimelineEntity> result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, null, null, null, null, infoFilters, null, null,
null, null);
null, null, null, null);
Assert.assertEquals(1, result.size());
// Only one entity with ID id_3 should be returned.
for (TimelineEntity entity : result) {
@ -478,7 +478,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, null, null, null, null, null, configFilters, null,
null, null);
null, null, null, null);
Assert.assertEquals(2, result.size());
for (TimelineEntity entity : result) {
if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
@ -493,7 +493,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, null, null, null, null, null, null, null,
eventFilters, null);
eventFilters, null, null, null);
Assert.assertEquals(1, result.size());
for (TimelineEntity entity : result) {
if (!entity.getId().equals("id_3")) {
@ -507,7 +507,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, null, null, null, null, null, null, metricFilters,
null, null);
null, null, null, null);
Assert.assertEquals(2, result.size());
// Two entities with IDs' id_1 and id_2 should be returned.
for (TimelineEntity entity : result) {
@ -527,7 +527,7 @@ public class TestFileSystemTimelineReaderImpl {
Set<TimelineEntity> result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, null, null, relatesTo, null, null, null, null,
null, null);
null, null, null, null);
Assert.assertEquals(1, result.size());
// Only one entity with ID id_1 should be returned.
for (TimelineEntity entity : result) {
@ -544,7 +544,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, null, null, null, isRelatedTo, null, null, null,
null, null);
null, null, null, null);
Assert.assertEquals(2, result.size());
// Two entities with IDs' id_1 and id_3 should be returned.
for (TimelineEntity entity : result) {

View File

@ -49,6 +49,11 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
@ -60,11 +65,17 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
/**
* Various tests to test writing entities to HBase and reading them back from
* it.
@ -79,18 +90,344 @@ import org.junit.Test;
public class TestHBaseTimelineStorage {
private static HBaseTestingUtility util;
private HBaseTimelineReaderImpl reader;
@BeforeClass
public static void setupBeforeClass() throws Exception {
util = new HBaseTestingUtility();
util.startMiniCluster();
createSchema();
loadEntities();
loadApps();
}
private static void createSchema() throws IOException {
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
}
private static void loadApps() throws IOException {
TimelineEntities te = new TimelineEntities();
TimelineEntity entity = new TimelineEntity();
String id = "application_1111111111_2222";
entity.setId(id);
entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
Long cTime = 1425016501000L;
Long mTime = 1425026901000L;
entity.setCreatedTime(cTime);
entity.setModifiedTime(mTime);
// add the info map in Timeline Entity
Map<String, Object> infoMap = new HashMap<String, Object>();
infoMap.put("infoMapKey1", "infoMapValue1");
infoMap.put("infoMapKey2", 10);
entity.addInfo(infoMap);
// add the isRelatedToEntity info
String key = "task";
String value = "is_related_to_entity_id_here";
Set<String> isRelatedToSet = new HashSet<String>();
isRelatedToSet.add(value);
Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
isRelatedTo.put(key, isRelatedToSet);
entity.setIsRelatedToEntities(isRelatedTo);
// add the relatesTo info
key = "container";
value = "relates_to_entity_id_here";
Set<String> relatesToSet = new HashSet<String>();
relatesToSet.add(value);
value = "relates_to_entity_id_here_Second";
relatesToSet.add(value);
Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
relatesTo.put(key, relatesToSet);
entity.setRelatesToEntities(relatesTo);
// add some config entries
Map<String, String> conf = new HashMap<String, String>();
conf.put("config_param1", "value1");
conf.put("config_param2", "value2");
conf.put("cfg_param1", "value3");
entity.addConfigs(conf);
// add metrics
Set<TimelineMetric> metrics = new HashSet<>();
TimelineMetric m1 = new TimelineMetric();
m1.setId("MAP_SLOT_MILLIS");
Map<Long, Number> metricValues = new HashMap<Long, Number>();
long ts = System.currentTimeMillis();
metricValues.put(ts - 120000, 100000000);
metricValues.put(ts - 100000, 200000000);
metricValues.put(ts - 80000, 300000000);
metricValues.put(ts - 60000, 400000000);
metricValues.put(ts - 40000, 50000000000L);
metricValues.put(ts - 20000, 60000000000L);
m1.setType(Type.TIME_SERIES);
m1.setValues(metricValues);
metrics.add(m1);
TimelineMetric m12 = new TimelineMetric();
m12.setId("MAP1_BYTES");
m12.addValue(ts, 50);
metrics.add(m12);
entity.addMetrics(metrics);
TimelineEvent event = new TimelineEvent();
event.setId("event1");
event.setTimestamp(ts - 2000);
entity.addEvent(event);
te.addEntity(entity);
TimelineEntities te1 = new TimelineEntities();
TimelineEntity entity1 = new TimelineEntity();
String id1 = "application_1111111111_3333";
entity1.setId(id1);
entity1.setType(TimelineEntityType.YARN_APPLICATION.toString());
entity1.setCreatedTime(cTime);
entity1.setModifiedTime(mTime);
// add the info map in Timeline Entity
Map<String, Object> infoMap1 = new HashMap<String, Object>();
infoMap1.put("infoMapKey1", "infoMapValue1");
infoMap1.put("infoMapKey2", 10);
entity1.addInfo(infoMap1);
// add the isRelatedToEntity info
String key1 = "task";
String value1 = "is_related_to_entity_id_here";
Set<String> isRelatedToSet1 = new HashSet<String>();
isRelatedToSet1.add(value1);
Map<String, Set<String>> isRelatedTo1 = new HashMap<String, Set<String>>();
isRelatedTo1.put(key, isRelatedToSet1);
entity1.setIsRelatedToEntities(isRelatedTo1);
// add the relatesTo info
key1 = "container";
value1 = "relates_to_entity_id_here";
Set<String> relatesToSet1 = new HashSet<String>();
relatesToSet1.add(value1);
value1 = "relates_to_entity_id_here_Second";
relatesToSet1.add(value1);
Map<String, Set<String>> relatesTo1 = new HashMap<String, Set<String>>();
relatesTo1.put(key1, relatesToSet1);
entity1.setRelatesToEntities(relatesTo1);
// add some config entries
Map<String, String> conf1 = new HashMap<String, String>();
conf1.put("cfg_param1", "value1");
conf1.put("cfg_param2", "value2");
entity1.addConfigs(conf1);
// add metrics
Set<TimelineMetric> metrics1 = new HashSet<>();
TimelineMetric m2 = new TimelineMetric();
m2.setId("MAP1_SLOT_MILLIS");
Map<Long, Number> metricValues1 = new HashMap<Long, Number>();
long ts1 = System.currentTimeMillis();
metricValues1.put(ts1 - 120000, 100000000);
metricValues1.put(ts1 - 100000, 200000000);
metricValues1.put(ts1 - 80000, 300000000);
metricValues1.put(ts1 - 60000, 400000000);
metricValues1.put(ts1 - 40000, 50000000000L);
metricValues1.put(ts1 - 20000, 60000000000L);
m2.setType(Type.TIME_SERIES);
m2.setValues(metricValues1);
metrics1.add(m2);
entity1.addMetrics(metrics1);
te1.addEntity(entity1);
TimelineEntities te2 = new TimelineEntities();
TimelineEntity entity2 = new TimelineEntity();
String id2 = "application_1111111111_4444";
entity2.setId(id2);
entity2.setType(TimelineEntityType.YARN_APPLICATION.toString());
entity2.setCreatedTime(cTime);
entity2.setModifiedTime(mTime);
te2.addEntity(entity2);
HBaseTimelineWriterImpl hbi = null;
try {
hbi = new HBaseTimelineWriterImpl(util.getConfiguration());
hbi.init(util.getConfiguration());
hbi.start();
String cluster = "cluster1";
String user = "user1";
String flow = "some_flow_name";
String flowVersion = "AB7822C10F1111";
long runid = 1002345678919L;
String appName = "application_1111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
appName = "application_1111111111_3333";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
appName = "application_1111111111_4444";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te2);
hbi.stop();
} finally {
if (hbi != null) {
hbi.stop();
hbi.close();
}
}
}
private static void loadEntities() throws IOException {
TimelineEntities te = new TimelineEntities();
TimelineEntity entity = new TimelineEntity();
String id = "hello";
String type = "world";
entity.setId(id);
entity.setType(type);
Long cTime = 1425016501000L;
Long mTime = 1425026901000L;
entity.setCreatedTime(cTime);
entity.setModifiedTime(mTime);
// add the info map in Timeline Entity
Map<String, Object> infoMap = new HashMap<String, Object>();
infoMap.put("infoMapKey1", "infoMapValue1");
infoMap.put("infoMapKey2", 10);
entity.addInfo(infoMap);
// add the isRelatedToEntity info
String key = "task";
String value = "is_related_to_entity_id_here";
Set<String> isRelatedToSet = new HashSet<String>();
isRelatedToSet.add(value);
Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
isRelatedTo.put(key, isRelatedToSet);
entity.setIsRelatedToEntities(isRelatedTo);
// add the relatesTo info
key = "container";
value = "relates_to_entity_id_here";
Set<String> relatesToSet = new HashSet<String>();
relatesToSet.add(value);
value = "relates_to_entity_id_here_Second";
relatesToSet.add(value);
Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
relatesTo.put(key, relatesToSet);
entity.setRelatesToEntities(relatesTo);
// add some config entries
Map<String, String> conf = new HashMap<String, String>();
conf.put("config_param1", "value1");
conf.put("config_param2", "value2");
conf.put("cfg_param1", "value3");
entity.addConfigs(conf);
// add metrics
Set<TimelineMetric> metrics = new HashSet<>();
TimelineMetric m1 = new TimelineMetric();
m1.setId("MAP_SLOT_MILLIS");
Map<Long, Number> metricValues = new HashMap<Long, Number>();
long ts = System.currentTimeMillis();
metricValues.put(ts - 120000, 100000000);
metricValues.put(ts - 100000, 200000000);
metricValues.put(ts - 80000, 300000000);
metricValues.put(ts - 60000, 400000000);
metricValues.put(ts - 40000, 50000000000L);
metricValues.put(ts - 20000, 60000000000L);
m1.setType(Type.TIME_SERIES);
m1.setValues(metricValues);
metrics.add(m1);
TimelineMetric m12 = new TimelineMetric();
m12.setId("MAP1_BYTES");
m12.addValue(ts, 50);
metrics.add(m12);
entity.addMetrics(metrics);
te.addEntity(entity);
TimelineEntity entity1 = new TimelineEntity();
String id1 = "hello1";
entity1.setId(id1);
entity1.setType(type);
entity1.setCreatedTime(cTime);
entity1.setModifiedTime(mTime);
// add the info map in Timeline Entity
Map<String, Object> infoMap1 = new HashMap<String, Object>();
infoMap1.put("infoMapKey1", "infoMapValue1");
infoMap1.put("infoMapKey2", 10);
entity1.addInfo(infoMap1);
// add the isRelatedToEntity info
String key1 = "task";
String value1 = "is_related_to_entity_id_here";
Set<String> isRelatedToSet1 = new HashSet<String>();
isRelatedToSet1.add(value1);
Map<String, Set<String>> isRelatedTo1 = new HashMap<String, Set<String>>();
isRelatedTo1.put(key, isRelatedToSet1);
entity1.setIsRelatedToEntities(isRelatedTo1);
// add the relatesTo info
key1 = "container";
value1 = "relates_to_entity_id_here";
Set<String> relatesToSet1 = new HashSet<String>();
relatesToSet1.add(value1);
value1 = "relates_to_entity_id_here_Second";
relatesToSet1.add(value1);
Map<String, Set<String>> relatesTo1 = new HashMap<String, Set<String>>();
relatesTo1.put(key1, relatesToSet1);
entity1.setRelatesToEntities(relatesTo1);
// add some config entries
Map<String, String> conf1 = new HashMap<String, String>();
conf1.put("cfg_param1", "value1");
conf1.put("cfg_param2", "value2");
entity1.addConfigs(conf1);
// add metrics
Set<TimelineMetric> metrics1 = new HashSet<>();
TimelineMetric m2 = new TimelineMetric();
m2.setId("MAP1_SLOT_MILLIS");
Map<Long, Number> metricValues1 = new HashMap<Long, Number>();
long ts1 = System.currentTimeMillis();
metricValues1.put(ts1 - 120000, 100000000);
metricValues1.put(ts1 - 100000, 200000000);
metricValues1.put(ts1 - 80000, 300000000);
metricValues1.put(ts1 - 60000, 400000000);
metricValues1.put(ts1 - 40000, 50000000000L);
metricValues1.put(ts1 - 20000, 60000000000L);
m2.setType(Type.TIME_SERIES);
m2.setValues(metricValues1);
metrics1.add(m2);
entity1.addMetrics(metrics1);
te.addEntity(entity1);
TimelineEntity entity2 = new TimelineEntity();
String id2 = "hello2";
entity2.setId(id2);
entity2.setType(type);
entity2.setCreatedTime(cTime);
entity2.setModifiedTime(mTime);
te.addEntity(entity2);
HBaseTimelineWriterImpl hbi = null;
try {
hbi = new HBaseTimelineWriterImpl(util.getConfiguration());
hbi.init(util.getConfiguration());
hbi.start();
String cluster = "cluster1";
String user = "user1";
String flow = "some_flow_name";
String flowVersion = "AB7822C10F1111";
long runid = 1002345678919L;
String appName = "application_1231111111_1111";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
hbi.stop();
} finally {
if (hbi != null) {
hbi.stop();
hbi.close();
}
}
}
@Before
public void init() throws Exception {
reader = new HBaseTimelineReaderImpl();
reader.init(util.getConfiguration());
reader.start();
}
@After
public void stop() throws Exception {
if (reader != null) {
reader.stop();
reader.close();
}
}
private static void matchMetrics(Map<Long, Number> m1, Map<Long, Number> m2) {
assertEquals(m1.size(), m2.size());
for (Map.Entry<Long, Number> entry : m2.entrySet()) {
@ -163,15 +500,11 @@ public class TestHBaseTimelineStorage {
te.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
HBaseTimelineReaderImpl hbr = null;
try {
Configuration c1 = util.getConfiguration();
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
hbi.start();
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
String cluster = "cluster_test_write_app";
String user = "user1";
String flow = "some_flow_name";
@ -256,8 +589,8 @@ public class TestHBaseTimelineStorage {
matchMetrics(metricValues, metricMap);
// read the timeline entity using the reader this time
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId,
entity.getType(), entity.getId(),
TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appId,
entity.getType(), entity.getId(), null, null,
EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
@ -290,10 +623,6 @@ public class TestHBaseTimelineStorage {
hbi.stop();
hbi.close();
}
if (hbr != null) {
hbr.stop();
hbr.close();
}
}
}
@ -362,15 +691,11 @@ public class TestHBaseTimelineStorage {
te.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
HBaseTimelineReaderImpl hbr = null;
try {
Configuration c1 = util.getConfiguration();
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
hbi.start();
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
String cluster = "cluster_test_write_entity";
String user = "user1";
String flow = "some_flow_name";
@ -468,12 +793,13 @@ public class TestHBaseTimelineStorage {
assertEquals(17, colCount);
// read the timeline entity using the reader this time
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
entity.getType(), entity.getId(),
TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName,
entity.getType(), entity.getId(), null, null,
EnumSet.of(TimelineReader.Field.ALL));
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
Set<TimelineEntity> es1 = reader.getEntities(user, cluster, flow, runid,
appName, entity.getType(), null, null, null, null, null, null, null,
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
null, null, null, null, null, null,
EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
assertEquals(1, es1.size());
@ -505,10 +831,6 @@ public class TestHBaseTimelineStorage {
hbi.stop();
hbi.close();
}
if (hbr != null) {
hbr.stop();
hbr.close();
}
}
}
@ -559,15 +881,11 @@ public class TestHBaseTimelineStorage {
entities.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
HBaseTimelineReaderImpl hbr = null;
try {
Configuration c1 = util.getConfiguration();
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
hbi.start();
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
String cluster = "cluster_test_events";
String user = "user2";
String flow = "other_flow_name";
@ -612,11 +930,11 @@ public class TestHBaseTimelineStorage {
}
// read the timeline entity using the reader this time
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
entity.getType(), entity.getId(),
TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName,
entity.getType(), entity.getId(), null, null,
EnumSet.of(TimelineReader.Field.ALL));
TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName,
entity.getType(), entity.getId(),
TimelineEntity e2 = reader.getEntity(user, cluster, null, null, appName,
entity.getType(), entity.getId(), null, null,
EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
assertNotNull(e2);
@ -641,10 +959,6 @@ public class TestHBaseTimelineStorage {
hbi.stop();
hbi.close();
}
if (hbr != null) {
hbr.stop();
hbr.close();
}
}
}
@ -665,15 +979,11 @@ public class TestHBaseTimelineStorage {
entities.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
HBaseTimelineReaderImpl hbr = null;
try {
Configuration c1 = util.getConfiguration();
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
hbi.start();
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
String cluster = "cluster_test_empty_eventkey";
String user = "user_emptyeventkey";
String flow = "other_flow_name";
@ -726,12 +1036,13 @@ public class TestHBaseTimelineStorage {
assertEquals(1, rowCount);
// read the timeline entity using the reader this time
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
entity.getType(), entity.getId(),
TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName,
entity.getType(), entity.getId(), null, null,
EnumSet.of(TimelineReader.Field.ALL));
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
Set<TimelineEntity> es1 = reader.getEntities(user, cluster, flow, runid,
appName, entity.getType(), null, null, null, null, null, null, null,
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
null, null, null, null, null, null,
EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
assertEquals(1, es1.size());
@ -748,8 +1059,6 @@ public class TestHBaseTimelineStorage {
} finally {
hbi.stop();
hbi.close();
hbr.stop();;
hbr.close();
}
}
@ -816,6 +1125,291 @@ public class TestHBaseTimelineStorage {
}
}
@Test
public void testReadEntities() throws Exception {
TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
1002345678919L, "application_1231111111_1111","world", "hello", null,
null, EnumSet.of(Field.ALL));
assertNotNull(e1);
assertEquals(3, e1.getConfigs().size());
assertEquals(1, e1.getIsRelatedToEntities().size());
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
"some_flow_name", 1002345678919L, "application_1231111111_1111","world",
null, null, null, null, null, null, null, null, null, null, null, null,
null, EnumSet.of(Field.ALL));
assertEquals(3, es1.size());
}
@Test
public void testReadEntitiesDefaultView() throws Exception {
TimelineEntity e1 =
reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L,
"application_1231111111_1111","world", "hello", null, null, null);
assertNotNull(e1);
assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() &&
e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() &&
e1.getRelatesToEntities().isEmpty());
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
"some_flow_name", 1002345678919L, "application_1231111111_1111","world",
null, null, null, null, null, null, null, null, null, null, null, null,
null, null);
assertEquals(3, es1.size());
for (TimelineEntity e : es1) {
assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() &&
e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() &&
e.getRelatesToEntities().isEmpty());
}
}
@Test
public void testReadEntitiesByFields() throws Exception {
TimelineEntity e1 =
reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L,
"application_1231111111_1111","world", "hello", null, null,
EnumSet.of(Field.INFO, Field.CONFIGS));
assertNotNull(e1);
assertEquals(3, e1.getConfigs().size());
assertEquals(0, e1.getIsRelatedToEntities().size());
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
"some_flow_name", 1002345678919L, "application_1231111111_1111","world",
null, null, null, null, null, null, null, null, null, null, null, null,
null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS));
assertEquals(3, es1.size());
int metricsCnt = 0;
int isRelatedToCnt = 0;
int infoCnt = 0;
for (TimelineEntity entity : es1) {
metricsCnt += entity.getMetrics().size();
isRelatedToCnt += entity.getIsRelatedToEntities().size();
infoCnt += entity.getInfo().size();
}
assertEquals(0, infoCnt);
assertEquals(2, isRelatedToCnt);
assertEquals(3, metricsCnt);
}
@Test
public void testReadEntitiesConfigPrefix() throws Exception {
TimelineFilterList list =
new TimelineFilterList(Operator.OR,
new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
TimelineEntity e1 =
reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L,
"application_1231111111_1111","world", "hello", list, null, null);
assertNotNull(e1);
assertEquals(1, e1.getConfigs().size());
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
"some_flow_name", 1002345678919L, "application_1231111111_1111","world",
null, null, null, null, null, null, null, null, null, null, null,
list, null, null);
int cfgCnt = 0;
for (TimelineEntity entity : es1) {
cfgCnt += entity.getConfigs().size();
}
assertEquals(3, cfgCnt);
}
@Test
public void testReadEntitiesConfigFilterPrefix() throws Exception {
Map<String, String> confFilters = ImmutableMap.of("cfg_param1","value1");
TimelineFilterList list =
new TimelineFilterList(Operator.OR,
new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
"some_flow_name", 1002345678919L, "application_1231111111_1111","world",
null, null, null, null, null, null, null, null, confFilters, null, null,
list, null, null);
assertEquals(1, entities.size());
int cfgCnt = 0;
for (TimelineEntity entity : entities) {
cfgCnt += entity.getConfigs().size();
}
assertEquals(2, cfgCnt);
}
@Test
public void testReadEntitiesMetricPrefix() throws Exception {
TimelineFilterList list =
new TimelineFilterList(Operator.OR,
new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
TimelineEntity e1 =
reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L,
"application_1231111111_1111","world", "hello", null, list, null);
assertNotNull(e1);
assertEquals(1, e1.getMetrics().size());
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
"some_flow_name", 1002345678919L, "application_1231111111_1111","world",
null, null, null, null, null, null, null, null, null, null, null, null,
list, null);
int metricCnt = 0;
for (TimelineEntity entity : es1) {
metricCnt += entity.getMetrics().size();
}
assertEquals(2, metricCnt);
}
@Test
public void testReadEntitiesMetricFilterPrefix() throws Exception {
Set<String> metricFilters = ImmutableSet.of("MAP1_SLOT_MILLIS");
TimelineFilterList list =
new TimelineFilterList(Operator.OR,
new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
"some_flow_name", 1002345678919L, "application_1231111111_1111","world",
null, null, null, null, null, null, null, null, null, metricFilters,
null, null, list, null);
assertEquals(1, entities.size());
int metricCnt = 0;
for (TimelineEntity entity : entities) {
metricCnt += entity.getMetrics().size();
}
assertEquals(1, metricCnt);
}
@Test
public void testReadApps() throws Exception {
TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
1002345678919L, "application_1111111111_2222",
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null,
EnumSet.of(Field.ALL));
assertNotNull(e1);
assertEquals(3, e1.getConfigs().size());
assertEquals(1, e1.getIsRelatedToEntities().size());
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
"some_flow_name", 1002345678919L, null,
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
null, null, null, null, null, null, null, null, null,
EnumSet.of(Field.ALL));
assertEquals(3, es1.size());
}
@Test
public void testReadAppsDefaultView() throws Exception {
TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
1002345678919L, "application_1111111111_2222",
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null);
assertNotNull(e1);
assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() &&
e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() &&
e1.getRelatesToEntities().isEmpty());
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
"some_flow_name", 1002345678919L, null,
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
null, null, null, null, null, null, null, null, null, null);
assertEquals(3, es1.size());
for (TimelineEntity e : es1) {
assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() &&
e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() &&
e.getRelatesToEntities().isEmpty());
}
}
@Test
public void testReadAppsByFields() throws Exception {
TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
1002345678919L, "application_1111111111_2222",
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null,
EnumSet.of(Field.INFO, Field.CONFIGS));
assertNotNull(e1);
assertEquals(3, e1.getConfigs().size());
assertEquals(0, e1.getIsRelatedToEntities().size());
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
"some_flow_name", 1002345678919L, null,
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
null, null, null, null, null, null, null, null, null,
EnumSet.of(Field.IS_RELATED_TO, Field.METRICS));
assertEquals(3, es1.size());
int metricsCnt = 0;
int isRelatedToCnt = 0;
int infoCnt = 0;
for (TimelineEntity entity : es1) {
metricsCnt += entity.getMetrics().size();
isRelatedToCnt += entity.getIsRelatedToEntities().size();
infoCnt += entity.getInfo().size();
}
assertEquals(0, infoCnt);
assertEquals(2, isRelatedToCnt);
assertEquals(3, metricsCnt);
}
@Test
public void testReadAppsConfigPrefix() throws Exception {
TimelineFilterList list =
new TimelineFilterList(Operator.OR,
new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
1002345678919L, "application_1111111111_2222",
TimelineEntityType.YARN_APPLICATION.toString(), null, list, null, null);
assertNotNull(e1);
assertEquals(1, e1.getConfigs().size());
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
"some_flow_name", 1002345678919L, null,
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
null, null, null, null, null, null, null, list, null, null);
int cfgCnt = 0;
for (TimelineEntity entity : es1) {
cfgCnt += entity.getConfigs().size();
}
assertEquals(3, cfgCnt);
}
@Test
public void testReadAppsConfigFilterPrefix() throws Exception {
Map<String, String> confFilters = ImmutableMap.of("cfg_param1","value1");
TimelineFilterList list =
new TimelineFilterList(Operator.OR,
new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
"some_flow_name", 1002345678919L, null,
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
null, null, null, null, confFilters, null, null, list, null, null);
assertEquals(1, entities.size());
int cfgCnt = 0;
for (TimelineEntity entity : entities) {
cfgCnt += entity.getConfigs().size();
}
assertEquals(2, cfgCnt);
}
@Test
public void testReadAppsMetricPrefix() throws Exception {
TimelineFilterList list =
new TimelineFilterList(Operator.OR,
new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
1002345678919L, "application_1111111111_2222",
TimelineEntityType.YARN_APPLICATION.toString(), null, null, list, null);
assertNotNull(e1);
assertEquals(1, e1.getMetrics().size());
Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
"some_flow_name", 1002345678919L, null,
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
null, null, null, null, null, null, null, null, list, null);
int metricCnt = 0;
for (TimelineEntity entity : es1) {
metricCnt += entity.getMetrics().size();
}
assertEquals(2, metricCnt);
}
@Test
public void testReadAppsMetricFilterPrefix() throws Exception {
TimelineFilterList list =
new TimelineFilterList(Operator.OR,
new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
Set<String> metricFilters = ImmutableSet.of("MAP1_SLOT_MILLIS");
Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
"some_flow_name", 1002345678919L, null,
TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
null, null, null, null, null, metricFilters, null, null, list, null);
int metricCnt = 0;
assertEquals(1, entities.size());
for (TimelineEntity entity : entities) {
metricCnt += entity.getMetrics().size();
}
assertEquals(1, metricCnt);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();

View File

@ -182,7 +182,7 @@ public class TestHBaseStorageFlowActivity {
Set<TimelineEntity> entities =
hbr.getEntities(null, cluster, null, null, null,
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
null, null, null, null, null, null, null, null, null);
null, null, null, null, null, null, null, null, null, null, null);
assertEquals(1, entities.size());
for (TimelineEntity e : entities) {
FlowActivityEntity flowActivity = (FlowActivityEntity)e;
@ -238,7 +238,7 @@ public class TestHBaseStorageFlowActivity {
Set<TimelineEntity> entities =
hbr.getEntities(user, cluster, flow, null, null,
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
null, null, null, null, null, null, null, null, null);
null, null, null, null, null, null, null, null, null, null, null);
assertEquals(1, entities.size());
for (TimelineEntity e : entities) {
FlowActivityEntity entity = (FlowActivityEntity)e;
@ -353,7 +353,7 @@ public class TestHBaseStorageFlowActivity {
Set<TimelineEntity> entities =
hbr.getEntities(null, cluster, null, null, null,
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
null, null, null, null, null, null, null, null, null);
null, null, null, null, null, null, null, null, null, null, null);
assertEquals(1, entities.size());
for (TimelineEntity e : entities) {
FlowActivityEntity flowActivity = (FlowActivityEntity)e;

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
@ -44,9 +45,13 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -178,9 +183,8 @@ public class TestHBaseStorageFlowRun {
hbr.init(c1);
hbr.start();
// get the flow run entity
TimelineEntity entity =
hbr.getEntity(user, cluster, flow, runid, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null, null);
TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, null);
assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
FlowRunEntity flowRun = (FlowRunEntity)entity;
assertEquals(minStartTs, flowRun.getStartTime());
@ -238,9 +242,8 @@ public class TestHBaseStorageFlowRun {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
TimelineEntity entity =
hbr.getEntity(user, cluster, flow, runid, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null, null);
TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, null);
assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
Set<TimelineMetric> metrics = entity.getMetrics();
assertEquals(2, metrics.size());
@ -305,6 +308,181 @@ public class TestHBaseStorageFlowRun {
assertEquals(1, rowCount);
}
@Test
public void testWriteFlowRunMetricsPrefix() throws Exception {
String cluster = "testWriteFlowRunMetricsOneFlow_cluster1";
String user = "testWriteFlowRunMetricsOneFlow_user1";
String flow = "testing_flowRun_metrics_flow_name";
String flowVersion = "CF7022C10F1354";
long runid = 1002345678919L;
TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
String appName = "application_11111111111111_1111";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another application with same metric to this flow
te = new TimelineEntities();
TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
te.addEntity(entityApp2);
appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
hbi.flush();
} finally {
hbi.close();
}
// check flow run
checkFlowRunTable(cluster, user, flow, runid, c1);
// use the timeline reader to verify data
HBaseTimelineReaderImpl hbr = null;
try {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
TimelineFilterList metricsToRetrieve =
new TimelineFilterList(new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
metric1.substring(0, metric1.indexOf("_") + 1)));
TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null, null,
metricsToRetrieve, null);
assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
Set<TimelineMetric> metrics = entity.getMetrics();
assertEquals(1, metrics.size());
for (TimelineMetric metric : metrics) {
String id = metric.getId();
Map<Long, Number> values = metric.getValues();
assertEquals(1, values.size());
Number value = null;
for (Number n : values.values()) {
value = n;
}
switch (id) {
case metric1:
assertEquals(141L, value);
break;
default:
fail("unrecognized metric: " + id);
}
}
Set<TimelineEntity> entities = hbr.getEntities(user, cluster, flow, runid,
null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null,
null, null, null, null, null, null, null, null, null,
metricsToRetrieve, null);
assertEquals(1, entities.size());
for (TimelineEntity timelineEntity : entities) {
Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics();
assertEquals(1, timelineMetrics.size());
for (TimelineMetric metric : timelineMetrics) {
String id = metric.getId();
Map<Long, Number> values = metric.getValues();
assertEquals(1, values.size());
Number value = null;
for (Number n : values.values()) {
value = n;
}
switch (id) {
case metric1:
assertEquals(141L, value);
break;
default:
fail("unrecognized metric: " + id);
}
}
}
} finally {
hbr.close();
}
}
@Test
public void testWriteFlowRunsMetricFields() throws Exception {
String cluster = "testWriteFlowRunMetricsOneFlow_cluster1";
String user = "testWriteFlowRunMetricsOneFlow_user1";
String flow = "testing_flowRun_metrics_flow_name";
String flowVersion = "CF7022C10F1354";
long runid = 1002345678919L;
TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
String appName = "application_11111111111111_1111";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another application with same metric to this flow
te = new TimelineEntities();
TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
te.addEntity(entityApp2);
appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
hbi.flush();
} finally {
hbi.close();
}
// check flow run
checkFlowRunTable(cluster, user, flow, runid, c1);
// use the timeline reader to verify data
HBaseTimelineReaderImpl hbr = null;
try {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
Set<TimelineEntity> entities = hbr.getEntities(user, cluster, flow, runid,
null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null,
null, null, null, null, null, null, null, null, null, null, null);
assertEquals(1, entities.size());
for (TimelineEntity timelineEntity : entities) {
assertEquals(0, timelineEntity.getMetrics().size());
}
entities = hbr.getEntities(user, cluster, flow, runid,
null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null,
null, null, null, null, null, null, null, null, null,
null, EnumSet.of(Field.METRICS));
assertEquals(1, entities.size());
for (TimelineEntity timelineEntity : entities) {
Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics();
assertEquals(2, timelineMetrics.size());
for (TimelineMetric metric : timelineMetrics) {
String id = metric.getId();
Map<Long, Number> values = metric.getValues();
assertEquals(1, values.size());
Number value = null;
for (Number n : values.values()) {
value = n;
}
switch (id) {
case metric1:
assertEquals(141L, value);
break;
case metric2:
assertEquals(57L, value);
break;
default:
fail("unrecognized metric: " + id);
}
}
}
} finally {
hbr.close();
}
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();