YARN-6850 Ensure that supplemented timestamp is stored only for flow run metrics (Contributed by Varun Saxena via Vrushali C)

This commit is contained in:
Vrushali C 2017-07-24 16:00:21 -07:00 committed by Varun Saxena
parent 484d7e9b39
commit 0f751a7fec
5 changed files with 44 additions and 28 deletions

View File

@ -50,11 +50,28 @@ public class ColumnHelper<T> {
private final ValueConverter converter; private final ValueConverter converter;
private final boolean supplementTs;
public ColumnHelper(ColumnFamily<T> columnFamily) { public ColumnHelper(ColumnFamily<T> columnFamily) {
this(columnFamily, GenericConverter.getInstance()); this(columnFamily, GenericConverter.getInstance());
} }
public ColumnHelper(ColumnFamily<T> columnFamily, ValueConverter converter) { public ColumnHelper(ColumnFamily<T> columnFamily, ValueConverter converter) {
this(columnFamily, converter, false);
}
/**
* @param columnFamily column family implementation.
* @param converter converter use to encode/decode values stored in the column
* or column prefix.
* @param needSupplementTs flag to indicate if cell timestamp needs to be
* modified for this column by calling
* {@link TimestampGenerator#getSupplementedTimestamp(long, String)}. This
* would be required for columns(such as metrics in flow run table) where
* potential collisions can occur due to same timestamp.
*/
public ColumnHelper(ColumnFamily<T> columnFamily, ValueConverter converter,
boolean needSupplementTs) {
this.columnFamily = columnFamily; this.columnFamily = columnFamily;
columnFamilyBytes = columnFamily.getBytes(); columnFamilyBytes = columnFamily.getBytes();
if (converter == null) { if (converter == null) {
@ -62,6 +79,7 @@ public class ColumnHelper<T> {
} else { } else {
this.converter = converter; this.converter = converter;
} }
this.supplementTs = needSupplementTs;
} }
/** /**
@ -104,19 +122,25 @@ public class ColumnHelper<T> {
} }
/* /*
* Figures out the cell timestamp used in the Put For storing into flow run * Figures out the cell timestamp used in the Put For storing.
* table. We would like to left shift the timestamp and supplement it with the * Will supplement the timestamp if required. Typically done for flow run
* AppId id so that there are no collisions in the flow run table's cells * table.If we supplement the timestamp, we left shift the timestamp and
* supplement it with the AppId id so that there are no collisions in the flow
* run table's cells.
*/ */
private long getPutTimestamp(Long timestamp, Attribute[] attributes) { private long getPutTimestamp(Long timestamp, Attribute[] attributes) {
if (timestamp == null) { if (timestamp == null) {
timestamp = System.currentTimeMillis(); timestamp = System.currentTimeMillis();
} }
if (!this.supplementTs) {
return timestamp;
} else {
String appId = getAppIdFromAttributes(attributes); String appId = getAppIdFromAttributes(attributes);
long supplementedTS = TimestampGenerator.getSupplementedTimestamp( long supplementedTS = TimestampGenerator.getSupplementedTimestamp(
timestamp, appId); timestamp, appId);
return supplementedTS; return supplementedTS;
} }
}
private String getAppIdFromAttributes(Attribute[] attributes) { private String getAppIdFromAttributes(Attribute[] attributes) {
if (attributes == null) { if (attributes == null) {
@ -232,9 +256,9 @@ public class ColumnHelper<T> {
for (Entry<Long, byte[]> cell : cells.entrySet()) { for (Entry<Long, byte[]> cell : cells.entrySet()) {
V value = V value =
(V) converter.decodeValue(cell.getValue()); (V) converter.decodeValue(cell.getValue());
cellResults.put( Long ts = supplementTs ? TimestampGenerator.
TimestampGenerator.getTruncatedTimestamp(cell.getKey()), getTruncatedTimestamp(cell.getKey()) : cell.getKey();
value); cellResults.put(ts, value);
} }
} }
results.put(converterColumnKey, cellResults); results.put(converterColumnKey, cellResults);

View File

@ -313,16 +313,8 @@ public final class HBaseTimelineStorageUtils {
public static void setMetricsTimeRange(Query query, byte[] metricsCf, public static void setMetricsTimeRange(Query query, byte[] metricsCf,
long tsBegin, long tsEnd) { long tsBegin, long tsEnd) {
if (tsBegin != 0 || tsEnd != Long.MAX_VALUE) { if (tsBegin != 0 || tsEnd != Long.MAX_VALUE) {
long supplementedTsBegin = tsBegin == 0 ? 0 :
TimestampGenerator.getSupplementedTimestamp(tsBegin, null);
long supplementedTsEnd =
(tsEnd == Long.MAX_VALUE) ? Long.MAX_VALUE :
TimestampGenerator.getSupplementedTimestamp(tsEnd + 1, null);
// Handle overflow by resetting time begin to 0 and time end to
// Long#MAX_VALUE, if required.
query.setColumnFamilyTimeRange(metricsCf, query.setColumnFamilyTimeRange(metricsCf,
((supplementedTsBegin < 0) ? 0 : supplementedTsBegin), tsBegin, ((tsEnd == Long.MAX_VALUE) ? Long.MAX_VALUE : (tsEnd + 1)));
((supplementedTsEnd < 0) ? Long.MAX_VALUE : supplementedTsEnd));
} }
} }
} }

View File

@ -69,7 +69,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily, private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
String columnPrefix, AggregationOperation fra, ValueConverter converter, String columnPrefix, AggregationOperation fra, ValueConverter converter,
boolean compoundColQual) { boolean compoundColQual) {
column = new ColumnHelper<FlowRunTable>(columnFamily, converter); column = new ColumnHelper<FlowRunTable>(columnFamily, converter, true);
this.columnFamily = columnFamily; this.columnFamily = columnFamily;
this.columnPrefix = columnPrefix; this.columnPrefix = columnPrefix;
if (columnPrefix == null) { if (columnPrefix == null) {

View File

@ -362,8 +362,8 @@ class ApplicationEntityReader extends GenericEntityReader {
private void setMetricsTimeRange(Query query) { private void setMetricsTimeRange(Query query) {
// Set time range for metric values. // Set time range for metric values.
HBaseTimelineStorageUtils. HBaseTimelineStorageUtils.setMetricsTimeRange(
setMetricsTimeRange(query, ApplicationColumnFamily.METRICS.getBytes(), query, ApplicationColumnFamily.METRICS.getBytes(),
getDataToRetrieve().getMetricsTimeBegin(), getDataToRetrieve().getMetricsTimeBegin(),
getDataToRetrieve().getMetricsTimeEnd()); getDataToRetrieve().getMetricsTimeEnd());
} }

View File

@ -471,8 +471,8 @@ class GenericEntityReader extends TimelineEntityReader {
private void setMetricsTimeRange(Query query) { private void setMetricsTimeRange(Query query) {
// Set time range for metric values. // Set time range for metric values.
HBaseTimelineStorageUtils. HBaseTimelineStorageUtils.setMetricsTimeRange(
setMetricsTimeRange(query, EntityColumnFamily.METRICS.getBytes(), query, EntityColumnFamily.METRICS.getBytes(),
getDataToRetrieve().getMetricsTimeBegin(), getDataToRetrieve().getMetricsTimeBegin(),
getDataToRetrieve().getMetricsTimeEnd()); getDataToRetrieve().getMetricsTimeEnd());
} }