YARN-6850 Ensure that supplemented timestamp is stored only for flow run metrics (Contributed by Varun Saxena via Vrushali C)

This commit is contained in:
Vrushali C 2017-07-24 15:54:52 -07:00 committed by Varun Saxena
parent 70078e91e3
commit 61136d03f2
5 changed files with 44 additions and 28 deletions

View File

@ -52,11 +52,28 @@ public class ColumnHelper<T> {
private final ValueConverter converter;
private final boolean supplementTs;
public ColumnHelper(ColumnFamily<T> columnFamily) {
this(columnFamily, GenericConverter.getInstance());
}
public ColumnHelper(ColumnFamily<T> columnFamily, ValueConverter converter) {
this(columnFamily, converter, false);
}
/**
* @param columnFamily column family implementation.
* @param converter converter use to encode/decode values stored in the column
* or column prefix.
* @param needSupplementTs flag to indicate if cell timestamp needs to be
* modified for this column by calling
* {@link TimestampGenerator#getSupplementedTimestamp(long, String)}. This
* would be required for columns(such as metrics in flow run table) where
* potential collisions can occur due to same timestamp.
*/
public ColumnHelper(ColumnFamily<T> columnFamily, ValueConverter converter,
boolean needSupplementTs) {
this.columnFamily = columnFamily;
columnFamilyBytes = columnFamily.getBytes();
if (converter == null) {
@ -64,6 +81,7 @@ public ColumnHelper(ColumnFamily<T> columnFamily, ValueConverter converter) {
} else {
this.converter = converter;
}
this.supplementTs = needSupplementTs;
}
/**
@ -106,18 +124,24 @@ public void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
}
/*
* Figures out the cell timestamp used in the Put For storing into flow run
* table. We would like to left shift the timestamp and supplement it with the
* AppId id so that there are no collisions in the flow run table's cells
* Figures out the cell timestamp used in the Put For storing.
* Will supplement the timestamp if required. Typically done for flow run
* table.If we supplement the timestamp, we left shift the timestamp and
* supplement it with the AppId id so that there are no collisions in the flow
* run table's cells.
*/
private long getPutTimestamp(Long timestamp, Attribute[] attributes) {
if (timestamp == null) {
timestamp = System.currentTimeMillis();
}
String appId = getAppIdFromAttributes(attributes);
long supplementedTS = TimestampGenerator.getSupplementedTimestamp(
timestamp, appId);
return supplementedTS;
if (!this.supplementTs) {
return timestamp;
} else {
String appId = getAppIdFromAttributes(attributes);
long supplementedTS = TimestampGenerator.getSupplementedTimestamp(
timestamp, appId);
return supplementedTS;
}
}
private String getAppIdFromAttributes(Attribute[] attributes) {
@ -234,9 +258,9 @@ public Object readResult(Result result, byte[] columnQualifierBytes)
for (Entry<Long, byte[]> cell : cells.entrySet()) {
V value =
(V) converter.decodeValue(cell.getValue());
cellResults.put(
TimestampGenerator.getTruncatedTimestamp(cell.getKey()),
value);
Long ts = supplementTs ? TimestampGenerator.
getTruncatedTimestamp(cell.getKey()) : cell.getKey();
cellResults.put(ts, value);
}
}
results.put(converterColumnKey, cellResults);

View File

@ -347,16 +347,8 @@ public static boolean isIntegralValue(Object obj) {
public static void setMetricsTimeRange(Query query, byte[] metricsCf,
long tsBegin, long tsEnd) {
if (tsBegin != 0 || tsEnd != Long.MAX_VALUE) {
long supplementedTsBegin = tsBegin == 0 ? 0 :
TimestampGenerator.getSupplementedTimestamp(tsBegin, null);
long supplementedTsEnd =
(tsEnd == Long.MAX_VALUE) ? Long.MAX_VALUE :
TimestampGenerator.getSupplementedTimestamp(tsEnd + 1, null);
// Handle overflow by resetting time begin to 0 and time end to
// Long#MAX_VALUE, if required.
query.setColumnFamilyTimeRange(metricsCf,
((supplementedTsBegin < 0) ? 0 : supplementedTsBegin),
((supplementedTsEnd < 0) ? Long.MAX_VALUE : supplementedTsEnd));
tsBegin, ((tsEnd == Long.MAX_VALUE) ? Long.MAX_VALUE : (tsEnd + 1)));
}
}
}

View File

@ -69,7 +69,7 @@ private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
String columnPrefix, AggregationOperation fra, ValueConverter converter,
boolean compoundColQual) {
column = new ColumnHelper<FlowRunTable>(columnFamily, converter);
column = new ColumnHelper<FlowRunTable>(columnFamily, converter, true);
this.columnFamily = columnFamily;
this.columnPrefix = columnPrefix;
if (columnPrefix == null) {

View File

@ -362,10 +362,10 @@ protected void augmentParams(Configuration hbaseConf, Connection conn)
private void setMetricsTimeRange(Query query) {
// Set time range for metric values.
HBaseTimelineStorageUtils.
setMetricsTimeRange(query, ApplicationColumnFamily.METRICS.getBytes(),
getDataToRetrieve().getMetricsTimeBegin(),
getDataToRetrieve().getMetricsTimeEnd());
HBaseTimelineStorageUtils.setMetricsTimeRange(
query, ApplicationColumnFamily.METRICS.getBytes(),
getDataToRetrieve().getMetricsTimeBegin(),
getDataToRetrieve().getMetricsTimeEnd());
}
@Override

View File

@ -471,10 +471,10 @@ protected Result getResult(Configuration hbaseConf, Connection conn,
private void setMetricsTimeRange(Query query) {
// Set time range for metric values.
HBaseTimelineStorageUtils.
setMetricsTimeRange(query, EntityColumnFamily.METRICS.getBytes(),
getDataToRetrieve().getMetricsTimeBegin(),
getDataToRetrieve().getMetricsTimeEnd());
HBaseTimelineStorageUtils.setMetricsTimeRange(
query, EntityColumnFamily.METRICS.getBytes(),
getDataToRetrieve().getMetricsTimeBegin(),
getDataToRetrieve().getMetricsTimeEnd());
}
@Override