YARN-4053. Change the way metric values are stored in HBase Storage (Varun Saxena via sjlee)

This commit is contained in:
Sangjin Lee 2015-11-20 10:03:02 -08:00
parent 09649005ca
commit 51254a6b51
16 changed files with 521 additions and 58 deletions

View File

@ -137,7 +137,7 @@ class FlowRunEntityReader extends TimelineEntityReader {
} }
// read the start time // read the start time
Number startTime = (Number)FlowRunColumn.MIN_START_TIME.readResult(result); Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result);
if (startTime != null) { if (startTime != null) {
flowRun.setStartTime(startTime.longValue()); flowRun.setStartTime(startTime.longValue());
} }
@ -147,7 +147,7 @@ class FlowRunEntityReader extends TimelineEntityReader {
} }
// read the end time if available // read the end time if available
Number endTime = (Number)FlowRunColumn.MAX_END_TIME.readResult(result); Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result);
if (endTime != null) { if (endTime != null) {
flowRun.setMaxEndTime(endTime.longValue()); flowRun.setMaxEndTime(endTime.longValue());
} }

View File

@ -26,8 +26,11 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/** /**
@ -63,7 +66,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
/** /**
* Metrics are stored with the metric name as the column name. * Metrics are stored with the metric name as the column name.
*/ */
METRIC(ApplicationColumnFamily.METRICS, null); METRIC(ApplicationColumnFamily.METRICS, null,
LongConverter.getInstance());
private final ColumnHelper<ApplicationTable> column; private final ColumnHelper<ApplicationTable> column;
private final ColumnFamily<ApplicationTable> columnFamily; private final ColumnFamily<ApplicationTable> columnFamily;
@ -83,7 +87,20 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
*/ */
private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily, private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
String columnPrefix) { String columnPrefix) {
column = new ColumnHelper<ApplicationTable>(columnFamily); this(columnFamily, columnPrefix, GenericConverter.getInstance());
}
/**
* Private constructor, meant to be used by the enum definition.
*
* @param columnFamily that this column is stored in.
* @param columnPrefix for this column.
* @param converter used to encode/decode values to be stored in HBase for
* this column prefix.
*/
private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
String columnPrefix, ValueConverter converter) {
column = new ColumnHelper<ApplicationTable>(columnFamily, converter);
this.columnFamily = columnFamily; this.columnFamily = columnFamily;
this.columnPrefix = columnPrefix; this.columnPrefix = columnPrefix;
if (columnPrefix == null) { if (columnPrefix == null) {

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/** /**
@ -50,9 +49,20 @@ public class ColumnHelper<T> {
*/ */
private final byte[] columnFamilyBytes; private final byte[] columnFamilyBytes;
private final ValueConverter converter;
public ColumnHelper(ColumnFamily<T> columnFamily) { public ColumnHelper(ColumnFamily<T> columnFamily) {
this(columnFamily, GenericConverter.getInstance());
}
public ColumnHelper(ColumnFamily<T> columnFamily, ValueConverter converter) {
this.columnFamily = columnFamily; this.columnFamily = columnFamily;
columnFamilyBytes = columnFamily.getBytes(); columnFamilyBytes = columnFamily.getBytes();
if (converter == null) {
this.converter = GenericConverter.getInstance();
} else {
this.converter = converter;
}
} }
/** /**
@ -83,7 +93,7 @@ public class ColumnHelper<T> {
Put p = new Put(rowKey); Put p = new Put(rowKey);
timestamp = getPutTimestamp(timestamp, attributes); timestamp = getPutTimestamp(timestamp, attributes);
p.addColumn(columnFamilyBytes, columnQualifier, timestamp, p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
GenericObjectMapper.write(inputValue)); converter.encodeValue(inputValue));
if ((attributes != null) && (attributes.length > 0)) { if ((attributes != null) && (attributes.length > 0)) {
for (Attribute attribute : attributes) { for (Attribute attribute : attributes) {
p.setAttribute(attribute.getName(), attribute.getValue()); p.setAttribute(attribute.getName(), attribute.getValue());
@ -148,7 +158,7 @@ public class ColumnHelper<T> {
// ByteBuffer to avoid copy, but GenericObjectMapper doesn't seem to like // ByteBuffer to avoid copy, but GenericObjectMapper doesn't seem to like
// that. // that.
byte[] value = result.getValue(columnFamilyBytes, columnQualifierBytes); byte[] value = result.getValue(columnFamilyBytes, columnQualifierBytes);
return GenericObjectMapper.read(value); return converter.decodeValue(value);
} }
/** /**
@ -206,7 +216,7 @@ public class ColumnHelper<T> {
if (cells != null) { if (cells != null) {
for (Entry<Long, byte[]> cell : cells.entrySet()) { for (Entry<Long, byte[]> cell : cells.entrySet()) {
V value = V value =
(V) GenericObjectMapper.read(cell.getValue()); (V) converter.decodeValue(cell.getValue());
cellResults.put( cellResults.put(
TimestampGenerator.getTruncatedTimestamp(cell.getKey()), TimestampGenerator.getTruncatedTimestamp(cell.getKey()),
value); value);
@ -266,7 +276,7 @@ public class ColumnHelper<T> {
// If this column has the prefix we want // If this column has the prefix we want
if (columnName != null) { if (columnName != null) {
Object value = GenericObjectMapper.read(entry.getValue()); Object value = converter.decodeValue(entry.getValue());
results.put(columnName, value); results.put(columnName, value);
} }
} }
@ -313,7 +323,7 @@ public class ColumnHelper<T> {
// This is the prefix that we want // This is the prefix that we want
byte[][] columnQualifierParts = byte[][] columnQualifierParts =
Separator.VALUES.split(columnNameParts[1]); Separator.VALUES.split(columnNameParts[1]);
Object value = GenericObjectMapper.read(entry.getValue()); Object value = converter.decodeValue(entry.getValue());
// we return the columnQualifier in parts since we don't know // we return the columnQualifier in parts since we don't know
// which part is of which data type // which part is of which data type
results.put(columnQualifierParts, value); results.put(columnQualifierParts, value);
@ -371,6 +381,11 @@ public class ColumnHelper<T> {
Separator.QUALIFIERS.join(columnPrefixBytes, Bytes.toBytes(qualifier)); Separator.QUALIFIERS.join(columnPrefixBytes, Bytes.toBytes(qualifier));
return columnQualifier; return columnQualifier;
} }
public ValueConverter getValueConverter() {
return converter;
}
/** /**
* @param columnPrefixBytes The byte representation for the column prefix. * @param columnPrefixBytes The byte representation for the column prefix.
* Should not contain {@link Separator#QUALIFIERS}. * Should not contain {@link Separator#QUALIFIERS}.

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.io.IOException;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
/**
* Uses GenericObjectMapper to encode objects as bytes and decode bytes as
* objects.
*/
public final class GenericConverter implements ValueConverter {
private static final GenericConverter INSTANCE = new GenericConverter();
private GenericConverter() {
}
public static GenericConverter getInstance() {
return INSTANCE;
}
@Override
public byte[] encodeValue(Object value) throws IOException {
return GenericObjectMapper.write(value);
}
@Override
public Object decodeValue(byte[] bytes) throws IOException {
return GenericObjectMapper.read(bytes);
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.io.IOException;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Encodes a value by interpreting it as a Long and converting it to bytes and
* decodes a set of bytes as a Long.
*/
public final class LongConverter implements NumericValueConverter {
private static final LongConverter INSTANCE = new LongConverter();
private LongConverter() {
}
public static LongConverter getInstance() {
return INSTANCE;
}
@Override
public byte[] encodeValue(Object value) throws IOException {
if (!TimelineStorageUtils.isIntegralValue(value)) {
throw new IOException("Expected integral value");
}
return Bytes.toBytes(((Number)value).longValue());
}
@Override
public Object decodeValue(byte[] bytes) throws IOException {
if (bytes == null) {
return null;
}
return Bytes.toLong(bytes);
}
/**
* Compares two numbers as longs. If either number is null, it will be taken
* as 0.
* @param num1
* @param num2
* @return -1 if num1 is less than num2, 0 if num1 is equal to num2 and 1 if
* num1 is greater than num2.
*/
@Override
public int compare(Number num1, Number num2) {
return Long.compare((num1 == null) ? 0L : num1.longValue(),
(num2 == null) ? 0L : num2.longValue());
}
@Override
public Number add(Number num1, Number num2, Number...numbers) {
long sum = ((num1 == null) ? 0L : num1.longValue()) +
((num2 == null) ? 0L : num2.longValue());
for (Number num : numbers) {
sum = sum + ((num == null) ? 0L : num.longValue());
}
return sum;
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.util.Comparator;
/**
* Extends ValueConverter interface for numeric converters to support numerical
* operations such as comparison, addition, etc.
*/
public interface NumericValueConverter extends ValueConverter,
Comparator<Number> {
/**
* Adds two or more numbers. If either of the numbers are null, it is taken as
* 0.
* @param num1
* @param num2
* @param numbers
* @return result after adding up the numbers.
*/
Number add(Number num1, Number num2, Number...numbers);
}

View File

@ -472,4 +472,15 @@ public class TimelineStorageUtils {
} }
return true; return true;
} }
/**
* Checks if passed object is of integral type(Short/Integer/Long).
* @param obj
* @return true if object passed is of type Short or Integer or Long, false
* otherwise.
*/
public static boolean isIntegralValue(Object obj) {
return (obj instanceof Short) || (obj instanceof Integer) ||
(obj instanceof Long);
}
} }

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.io.IOException;
/**
* Converter used to encode/decode value associated with a column prefix or a
* column.
*/
public interface ValueConverter {
/**
* Encode an object as a byte array depending on the converter implementation.
* @param value
* @return a byte array
* @throws IOException
*/
byte[] encodeValue(Object value) throws IOException;
/**
* Decode a byte array and convert it into an object depending on the
* converter implementation.
* @param bytes
* @return an object
* @throws IOException
*/
Object decodeValue(byte[] bytes) throws IOException;
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.timelineservice.storage.common contains
* a set of utility classes used across backend storage reader and writer.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.entity; package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;

View File

@ -26,8 +26,11 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/** /**
@ -63,7 +66,8 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
/** /**
* Metrics are stored with the metric name as the column name. * Metrics are stored with the metric name as the column name.
*/ */
METRIC(EntityColumnFamily.METRICS, null); METRIC(EntityColumnFamily.METRICS, null,
LongConverter.getInstance());
private final ColumnHelper<EntityTable> column; private final ColumnHelper<EntityTable> column;
private final ColumnFamily<EntityTable> columnFamily; private final ColumnFamily<EntityTable> columnFamily;
@ -83,7 +87,20 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
*/ */
EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily, EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
String columnPrefix) { String columnPrefix) {
column = new ColumnHelper<EntityTable>(columnFamily); this(columnFamily, columnPrefix, GenericConverter.getInstance());
}
/**
* Private constructor, meant to be used by the enum definition.
*
* @param columnFamily that this column is stored in.
* @param columnPrefix for this column.
* @param converter used to encode/decode values to be stored in HBase for
* this column prefix.
*/
EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
String columnPrefix, ValueConverter converter) {
column = new ColumnHelper<EntityTable>(columnFamily, converter);
this.columnFamily = columnFamily; this.columnFamily = columnFamily;
this.columnPrefix = columnPrefix; this.columnPrefix = columnPrefix;
if (columnPrefix == null) { if (columnPrefix == null) {

View File

@ -24,9 +24,12 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
/** /**
* Identifies fully qualified columns for the {@link FlowRunTable}. * Identifies fully qualified columns for the {@link FlowRunTable}.
@ -38,14 +41,14 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
* application start times. * application start times.
*/ */
MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time", MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time",
AggregationOperation.MIN), AggregationOperation.MIN, LongConverter.getInstance()),
/** /**
* When the flow ended. This is the maximum of currently known application end * When the flow ended. This is the maximum of currently known application end
* times. * times.
*/ */
MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time", MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time",
AggregationOperation.MAX), AggregationOperation.MAX, LongConverter.getInstance()),
/** /**
* The version of the flow that this flow belongs to. * The version of the flow that this flow belongs to.
@ -60,13 +63,20 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
private FlowRunColumn(ColumnFamily<FlowRunTable> columnFamily, private FlowRunColumn(ColumnFamily<FlowRunTable> columnFamily,
String columnQualifier, AggregationOperation aggOp) { String columnQualifier, AggregationOperation aggOp) {
this(columnFamily, columnQualifier, aggOp,
GenericConverter.getInstance());
}
private FlowRunColumn(ColumnFamily<FlowRunTable> columnFamily,
String columnQualifier, AggregationOperation aggOp,
ValueConverter converter) {
this.columnFamily = columnFamily; this.columnFamily = columnFamily;
this.columnQualifier = columnQualifier; this.columnQualifier = columnQualifier;
this.aggOp = aggOp; this.aggOp = aggOp;
// Future-proof by ensuring the right column prefix hygiene. // Future-proof by ensuring the right column prefix hygiene.
this.columnQualifierBytes = Bytes.toBytes(Separator.SPACE this.columnQualifierBytes = Bytes.toBytes(Separator.SPACE
.encode(columnQualifier)); .encode(columnQualifier));
this.column = new ColumnHelper<FlowRunTable>(columnFamily); this.column = new ColumnHelper<FlowRunTable>(columnFamily, converter);
} }
/** /**
@ -80,6 +90,10 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
return columnQualifierBytes.clone(); return columnQualifierBytes.clone();
} }
public byte[] getColumnFamilyBytes() {
return columnFamily.getBytes();
}
public AggregationOperation getAggregationOperation() { public AggregationOperation getAggregationOperation() {
return aggOp; return aggOp;
} }
@ -130,6 +144,10 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
return null; return null;
} }
public ValueConverter getValueConverter() {
return column.getValueConverter();
}
/** /**
* Retrieve an {@link FlowRunColumn} given a name, or null if there is no * Retrieve an {@link FlowRunColumn} given a name, or null if there is no
* match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}

View File

@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
/** /**
* Identifies partially qualified columns for the {@link FlowRunTable}. * Identifies partially qualified columns for the {@link FlowRunTable}.
@ -38,7 +40,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
/** /**
* To store flow run info values. * To store flow run info values.
*/ */
METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM); METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM,
LongConverter.getInstance());
private final ColumnHelper<FlowRunTable> column; private final ColumnHelper<FlowRunTable> column;
private final ColumnFamily<FlowRunTable> columnFamily; private final ColumnFamily<FlowRunTable> columnFamily;
@ -61,8 +64,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
* for this column. * for this column.
*/ */
private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily, private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
String columnPrefix, AggregationOperation fra) { String columnPrefix, AggregationOperation fra, ValueConverter converter) {
column = new ColumnHelper<FlowRunTable>(columnFamily); column = new ColumnHelper<FlowRunTable>(columnFamily, converter);
this.columnFamily = columnFamily; this.columnFamily = columnFamily;
this.columnPrefix = columnPrefix; this.columnPrefix = columnPrefix;
if (columnPrefix == null) { if (columnPrefix == null) {
@ -86,6 +89,14 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
return columnPrefixBytes.clone(); return columnPrefixBytes.clone();
} }
public byte[] getColumnPrefixBytes(String qualifier) {
return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
}
public byte[] getColumnFamilyBytes() {
return columnFamily.getBytes();
}
public AggregationOperation getAttribute() { public AggregationOperation getAttribute() {
return aggOp; return aggOp;
} }
@ -205,6 +216,10 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
return null; return null;
} }
public ValueConverter getValueConverter() {
return column.getValueConverter();
}
/** /**
* Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is
* no match. The following holds true: * no match. The following holds true:

View File

@ -39,8 +39,10 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
/** /**
* Invoked via the coprocessor when a Get or a Scan is issued for flow run * Invoked via the coprocessor when a Get or a Scan is issued for flow run
@ -113,6 +115,45 @@ class FlowScanner implements RegionScanner, Closeable {
return appId; return appId;
} }
/**
* Get value converter associated with a column or a column prefix. If nothing
* matches, generic converter is returned.
* @param colQualifierBytes
* @return value converter implementation.
*/
private static ValueConverter getValueConverter(byte[] colQualifierBytes) {
// Iterate over all the column prefixes for flow run table and get the
// appropriate converter for the column qualifier passed if prefix matches.
for (FlowRunColumnPrefix colPrefix : FlowRunColumnPrefix.values()) {
byte[] colPrefixBytes = colPrefix.getColumnPrefixBytes("");
if (Bytes.compareTo(colPrefixBytes, 0, colPrefixBytes.length,
colQualifierBytes, 0, colPrefixBytes.length) == 0) {
return colPrefix.getValueConverter();
}
}
// Iterate over all the columns for flow run table and get the
// appropriate converter for the column qualifier passed if match occurs.
for (FlowRunColumn column : FlowRunColumn.values()) {
if (Bytes.compareTo(
column.getColumnQualifierBytes(), colQualifierBytes) == 0) {
return column.getValueConverter();
}
}
// Return generic converter if nothing matches.
return GenericConverter.getInstance();
}
/**
* Checks if the converter is a numeric converter or not. For a converter to
* be numeric, it must implement {@link NumericValueConverter} interface.
* @param converter
* @return true, if converter is of type NumericValueConverter, false
* otherwise.
*/
private static boolean isNumericConverter(ValueConverter converter) {
return (converter instanceof NumericValueConverter);
}
/** /**
* This method loops through the cells in a given row of the * This method loops through the cells in a given row of the
* {@link FlowRunTable}. It looks at the tags of each cell to figure out how * {@link FlowRunTable}. It looks at the tags of each cell to figure out how
@ -141,20 +182,32 @@ class FlowScanner implements RegionScanner, Closeable {
SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
Set<String> alreadySeenAggDim = new HashSet<>(); Set<String> alreadySeenAggDim = new HashSet<>();
int addedCnt = 0; int addedCnt = 0;
ValueConverter converter = null;
while (((cell = peekAtNextCell(limit)) != null) while (((cell = peekAtNextCell(limit)) != null)
&& (limit <= 0 || addedCnt < limit)) { && (limit <= 0 || addedCnt < limit)) {
byte[] newColumnQualifier = CellUtil.cloneQualifier(cell); byte[] newColumnQualifier = CellUtil.cloneQualifier(cell);
if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) { if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
addedCnt += emitCells(cells, currentColumnCells, currentAggOp); if (converter != null && isNumericConverter(converter)) {
addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
(NumericValueConverter)converter);
}
resetState(currentColumnCells, alreadySeenAggDim); resetState(currentColumnCells, alreadySeenAggDim);
currentColumnQualifier = newColumnQualifier; currentColumnQualifier = newColumnQualifier;
currentAggOp = getCurrentAggOp(cell); currentAggOp = getCurrentAggOp(cell);
converter = getValueConverter(newColumnQualifier);
} }
collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim); // No operation needs to be performed on non numeric converters.
if (!isNumericConverter(converter)) {
nextCell(limit);
continue;
}
collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim,
(NumericValueConverter)converter);
nextCell(limit); nextCell(limit);
} }
if (!currentColumnCells.isEmpty()) { if (!currentColumnCells.isEmpty()) {
emitCells(cells, currentColumnCells, currentAggOp); emitCells(cells, currentColumnCells, currentAggOp,
(NumericValueConverter)converter);
} }
return hasMore(); return hasMore();
} }
@ -183,7 +236,8 @@ class FlowScanner implements RegionScanner, Closeable {
private void collectCells(SortedSet<Cell> currentColumnCells, private void collectCells(SortedSet<Cell> currentColumnCells,
AggregationOperation currentAggOp, Cell cell, AggregationOperation currentAggOp, Cell cell,
Set<String> alreadySeenAggDim) throws IOException { Set<String> alreadySeenAggDim, NumericValueConverter converter)
throws IOException {
if (currentAggOp == null) { if (currentAggOp == null) {
// not a min/max/metric cell, so just return it as is // not a min/max/metric cell, so just return it as is
currentColumnCells.add(cell); currentColumnCells.add(cell);
@ -197,7 +251,8 @@ class FlowScanner implements RegionScanner, Closeable {
currentColumnCells.add(cell); currentColumnCells.add(cell);
} else { } else {
Cell currentMinCell = currentColumnCells.first(); Cell currentMinCell = currentColumnCells.first();
Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp); Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp,
converter);
if (!currentMinCell.equals(newMinCell)) { if (!currentMinCell.equals(newMinCell)) {
currentColumnCells.remove(currentMinCell); currentColumnCells.remove(currentMinCell);
currentColumnCells.add(newMinCell); currentColumnCells.add(newMinCell);
@ -209,7 +264,8 @@ class FlowScanner implements RegionScanner, Closeable {
currentColumnCells.add(cell); currentColumnCells.add(cell);
} else { } else {
Cell currentMaxCell = currentColumnCells.first(); Cell currentMaxCell = currentColumnCells.first();
Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp); Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp,
converter);
if (!currentMaxCell.equals(newMaxCell)) { if (!currentMaxCell.equals(newMaxCell)) {
currentColumnCells.remove(currentMaxCell); currentColumnCells.remove(currentMaxCell);
currentColumnCells.add(newMaxCell); currentColumnCells.add(newMaxCell);
@ -245,7 +301,8 @@ class FlowScanner implements RegionScanner, Closeable {
* parameter. * parameter.
*/ */
private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells, private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
AggregationOperation currentAggOp) throws IOException { AggregationOperation currentAggOp, NumericValueConverter converter)
throws IOException {
if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) { if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
return 0; return 0;
} }
@ -261,7 +318,7 @@ class FlowScanner implements RegionScanner, Closeable {
return currentColumnCells.size(); return currentColumnCells.size();
case SUM: case SUM:
case SUM_FINAL: case SUM_FINAL:
Cell sumCell = processSummation(currentColumnCells); Cell sumCell = processSummation(currentColumnCells, converter);
cells.add(sumCell); cells.add(sumCell);
return 1; return 1;
default: default:
@ -276,24 +333,24 @@ class FlowScanner implements RegionScanner, Closeable {
* sum of a metric for a flow run is the summation at the point of the last * sum of a metric for a flow run is the summation at the point of the last
* metric update in that flow till that time. * metric update in that flow till that time.
*/ */
private Cell processSummation(SortedSet<Cell> currentColumnCells) private Cell processSummation(SortedSet<Cell> currentColumnCells,
throws IOException { NumericValueConverter converter) throws IOException {
Number sum = 0; Number sum = 0;
Number currentValue = 0; Number currentValue = 0;
long ts = 0L; long ts = 0L;
long mostCurrentTimestamp = 0l; long mostCurrentTimestamp = 0L;
Cell mostRecentCell = null; Cell mostRecentCell = null;
for (Cell cell : currentColumnCells) { for (Cell cell : currentColumnCells) {
currentValue = (Number) GenericObjectMapper.read(CellUtil currentValue = (Number) converter.decodeValue(CellUtil.cloneValue(cell));
.cloneValue(cell));
ts = cell.getTimestamp(); ts = cell.getTimestamp();
if (mostCurrentTimestamp < ts) { if (mostCurrentTimestamp < ts) {
mostCurrentTimestamp = ts; mostCurrentTimestamp = ts;
mostRecentCell = cell; mostRecentCell = cell;
} }
sum = sum.longValue() + currentValue.longValue(); sum = converter.add(sum, currentValue);
} }
Cell sumCell = createNewCell(mostRecentCell, sum); byte[] sumBytes = converter.encodeValue(sum);
Cell sumCell = createNewCell(mostRecentCell, sumBytes);
return sumCell; return sumCell;
} }
@ -308,18 +365,20 @@ class FlowScanner implements RegionScanner, Closeable {
* @throws IOException * @throws IOException
*/ */
private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell, private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
AggregationOperation currentAggOp) throws IOException { AggregationOperation currentAggOp, NumericValueConverter converter)
throws IOException {
if (previouslyChosenCell == null) { if (previouslyChosenCell == null) {
return currentCell; return currentCell;
} }
try { try {
long previouslyChosenCellValue = ((Number) GenericObjectMapper Number previouslyChosenCellValue = (Number)converter.decodeValue(
.read(CellUtil.cloneValue(previouslyChosenCell))).longValue(); CellUtil.cloneValue(previouslyChosenCell));
long currentCellValue = ((Number) GenericObjectMapper.read(CellUtil Number currentCellValue = (Number) converter.decodeValue(CellUtil
.cloneValue(currentCell))).longValue(); .cloneValue(currentCell));
switch (currentAggOp) { switch (currentAggOp) {
case MIN: case MIN:
if (currentCellValue < previouslyChosenCellValue) { if (converter.compare(
currentCellValue, previouslyChosenCellValue) < 0) {
// new value is minimum, hence return this cell // new value is minimum, hence return this cell
return currentCell; return currentCell;
} else { } else {
@ -327,7 +386,8 @@ class FlowScanner implements RegionScanner, Closeable {
return previouslyChosenCell; return previouslyChosenCell;
} }
case MAX: case MAX:
if (currentCellValue > previouslyChosenCellValue) { if (converter.compare(
currentCellValue, previouslyChosenCellValue) > 0) {
// new value is max, hence return this cell // new value is max, hence return this cell
return currentCell; return currentCell;
} else { } else {
@ -343,8 +403,8 @@ class FlowScanner implements RegionScanner, Closeable {
} }
} }
private Cell createNewCell(Cell origCell, Number number) throws IOException { private Cell createNewCell(Cell origCell, byte[] newValue)
byte[] newValue = GenericObjectMapper.write(number); throws IOException {
return CellUtil.createCell(CellUtil.cloneRow(origCell), return CellUtil.createCell(CellUtil.cloneRow(origCell),
CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -90,6 +91,15 @@ public class TestHBaseTimelineStorage {
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
} }
private static void matchMetrics(Map<Long, Number> m1, Map<Long, Number> m2) {
assertEquals(m1.size(), m2.size());
for (Map.Entry<Long, Number> entry : m2.entrySet()) {
Number val = m1.get(entry.getKey());
assertNotNull(val);
assertEquals(val.longValue(), entry.getValue().longValue());
}
}
@Test @Test
public void testWriteApplicationToHBase() throws Exception { public void testWriteApplicationToHBase() throws Exception {
TimelineEntities te = new TimelineEntities(); TimelineEntities te = new TimelineEntities();
@ -243,7 +253,7 @@ public class TestHBaseTimelineStorage {
ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result); ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId()); NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
assertEquals(metricValues, metricMap); matchMetrics(metricValues, metricMap);
// read the timeline entity using the reader this time // read the timeline entity using the reader this time
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId, TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId,
@ -273,7 +283,7 @@ public class TestHBaseTimelineStorage {
assertEquals(metrics, metrics2); assertEquals(metrics, metrics2);
for (TimelineMetric metric2 : metrics2) { for (TimelineMetric metric2 : metrics2) {
Map<Long, Number> metricValues2 = metric2.getValues(); Map<Long, Number> metricValues2 = metric2.getValues();
assertEquals(metricValues, metricValues2); matchMetrics(metricValues, metricValues2);
} }
} finally { } finally {
if (hbi != null) { if (hbi != null) {
@ -451,7 +461,7 @@ public class TestHBaseTimelineStorage {
EntityColumnPrefix.METRIC.readResultsWithTimestamps(result); EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId()); NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
assertEquals(metricValues, metricMap); matchMetrics(metricValues, metricMap);
} }
} }
assertEquals(1, rowCount); assertEquals(1, rowCount);
@ -488,7 +498,7 @@ public class TestHBaseTimelineStorage {
assertEquals(metrics, metrics2); assertEquals(metrics, metrics2);
for (TimelineMetric metric2 : metrics2) { for (TimelineMetric metric2 : metrics2) {
Map<Long, Number> metricValues2 = metric2.getValues(); Map<Long, Number> metricValues2 = metric2.getValues();
assertEquals(metricValues, metricValues2); matchMetrics(metricValues, metricValues2);
} }
} finally { } finally {
if (hbi != null) { if (hbi != null) {
@ -743,6 +753,69 @@ public class TestHBaseTimelineStorage {
} }
} }
@Test
public void testNonIntegralMetricValues() throws IOException {
TimelineEntities teApp = new TimelineEntities();
ApplicationEntity entityApp = new ApplicationEntity();
String appId = "application_1000178881110_2002";
entityApp.setId(appId);
entityApp.setCreatedTime(1425016501000L);
entityApp.setModifiedTime(1425026901000L);
// add metrics with floating point values
Set<TimelineMetric> metricsApp = new HashSet<>();
TimelineMetric mApp = new TimelineMetric();
mApp.setId("MAP_SLOT_MILLIS");
Map<Long, Number> metricAppValues = new HashMap<Long, Number>();
long ts = System.currentTimeMillis();
metricAppValues.put(ts - 20, 10.5);
metricAppValues.put(ts - 10, 20.5);
mApp.setType(Type.TIME_SERIES);
mApp.setValues(metricAppValues);
metricsApp.add(mApp);
entityApp.addMetrics(metricsApp);
teApp.addEntity(entityApp);
TimelineEntities teEntity = new TimelineEntities();
TimelineEntity entity = new TimelineEntity();
entity.setId("hello");
entity.setType("world");
entity.setCreatedTime(1425016501000L);
entity.setModifiedTime(1425026901000L);
// add metrics with floating point values
Set<TimelineMetric> metricsEntity = new HashSet<>();
TimelineMetric mEntity = new TimelineMetric();
mEntity.setId("MAP_SLOT_MILLIS");
mEntity.addValue(ts - 20, 10.5);
metricsEntity.add(mEntity);
entity.addMetrics(metricsEntity);
teEntity.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
try {
Configuration c1 = util.getConfiguration();
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
hbi.start();
// Writing application entity.
try {
hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teApp);
Assert.fail("Expected an exception as metric values are non integral");
} catch (IOException e) {}
// Writing generic entity.
try {
hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teEntity);
Assert.fail("Expected an exception as metric values are non integral");
} catch (IOException e) {}
hbi.stop();
} finally {
if (hbi != null) {
hbi.stop();
hbi.close();
}
}
}
@AfterClass @AfterClass
public static void tearDownAfterClass() throws Exception { public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster(); util.shutdownMiniCluster();

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@ -164,10 +165,10 @@ public class TestHBaseStorageFlowRun {
.getBytes()); .getBytes());
assertEquals(2, r1.size()); assertEquals(2, r1.size());
long starttime = (Long) GenericObjectMapper.read(values long starttime = Bytes.toLong(values.get(
.get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
assertEquals(minStartTs, starttime); assertEquals(minStartTs, starttime);
assertEquals(endTs, GenericObjectMapper.read(values assertEquals(endTs, Bytes.toLong(values
.get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
// use the timeline reader to verify data // use the timeline reader to verify data
@ -253,10 +254,10 @@ public class TestHBaseStorageFlowRun {
} }
switch (id) { switch (id) {
case metric1: case metric1:
assertEquals(141, value); assertEquals(141L, value);
break; break;
case metric2: case metric2:
assertEquals(57, value); assertEquals(57L, value);
break; break;
default: default:
fail("unrecognized metric: " + id); fail("unrecognized metric: " + id);
@ -292,14 +293,14 @@ public class TestHBaseStorageFlowRun {
byte[] q = ColumnHelper.getColumnQualifier( byte[] q = ColumnHelper.getColumnQualifier(
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1); FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1);
assertTrue(values.containsKey(q)); assertTrue(values.containsKey(q));
assertEquals(141, GenericObjectMapper.read(values.get(q))); assertEquals(141L, Bytes.toLong(values.get(q)));
// check metric2 // check metric2
assertEquals(2, values.size()); assertEquals(2, values.size());
q = ColumnHelper.getColumnQualifier( q = ColumnHelper.getColumnQualifier(
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2); FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2);
assertTrue(values.containsKey(q)); assertTrue(values.containsKey(q));
assertEquals(57, GenericObjectMapper.read(values.get(q))); assertEquals(57L, Bytes.toLong(values.get(q)));
} }
assertEquals(1, rowCount); assertEquals(1, rowCount);
} }