YARN-6850 Ensure that supplemented timestamp is stored only for flow run metrics (Contributed by Varun Saxena via Vrushali C)
This commit is contained in:
parent
f5bb5edbfb
commit
986f1e4f3c
|
@ -52,11 +52,28 @@ public class ColumnHelper<T> {
|
|||
|
||||
private final ValueConverter converter;
|
||||
|
||||
private final boolean supplementTs;
|
||||
|
||||
public ColumnHelper(ColumnFamily<T> columnFamily) {
|
||||
this(columnFamily, GenericConverter.getInstance());
|
||||
}
|
||||
|
||||
public ColumnHelper(ColumnFamily<T> columnFamily, ValueConverter converter) {
|
||||
this(columnFamily, converter, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param columnFamily column family implementation.
|
||||
* @param converter converter use to encode/decode values stored in the column
|
||||
* or column prefix.
|
||||
* @param needSupplementTs flag to indicate if cell timestamp needs to be
|
||||
* modified for this column by calling
|
||||
* {@link TimestampGenerator#getSupplementedTimestamp(long, String)}. This
|
||||
* would be required for columns(such as metrics in flow run table) where
|
||||
* potential collisions can occur due to same timestamp.
|
||||
*/
|
||||
public ColumnHelper(ColumnFamily<T> columnFamily, ValueConverter converter,
|
||||
boolean needSupplementTs) {
|
||||
this.columnFamily = columnFamily;
|
||||
columnFamilyBytes = columnFamily.getBytes();
|
||||
if (converter == null) {
|
||||
|
@ -64,6 +81,7 @@ public class ColumnHelper<T> {
|
|||
} else {
|
||||
this.converter = converter;
|
||||
}
|
||||
this.supplementTs = needSupplementTs;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -106,19 +124,25 @@ public class ColumnHelper<T> {
|
|||
}
|
||||
|
||||
/*
|
||||
* Figures out the cell timestamp used in the Put For storing into flow run
|
||||
* table. We would like to left shift the timestamp and supplement it with the
|
||||
* AppId id so that there are no collisions in the flow run table's cells
|
||||
* Figures out the cell timestamp used in the Put For storing.
|
||||
* Will supplement the timestamp if required. Typically done for flow run
|
||||
* table.If we supplement the timestamp, we left shift the timestamp and
|
||||
* supplement it with the AppId id so that there are no collisions in the flow
|
||||
* run table's cells.
|
||||
*/
|
||||
private long getPutTimestamp(Long timestamp, Attribute[] attributes) {
|
||||
if (timestamp == null) {
|
||||
timestamp = System.currentTimeMillis();
|
||||
}
|
||||
if (!this.supplementTs) {
|
||||
return timestamp;
|
||||
} else {
|
||||
String appId = getAppIdFromAttributes(attributes);
|
||||
long supplementedTS = TimestampGenerator.getSupplementedTimestamp(
|
||||
timestamp, appId);
|
||||
return supplementedTS;
|
||||
}
|
||||
}
|
||||
|
||||
private String getAppIdFromAttributes(Attribute[] attributes) {
|
||||
if (attributes == null) {
|
||||
|
@ -234,9 +258,9 @@ public class ColumnHelper<T> {
|
|||
for (Entry<Long, byte[]> cell : cells.entrySet()) {
|
||||
V value =
|
||||
(V) converter.decodeValue(cell.getValue());
|
||||
cellResults.put(
|
||||
TimestampGenerator.getTruncatedTimestamp(cell.getKey()),
|
||||
value);
|
||||
Long ts = supplementTs ? TimestampGenerator.
|
||||
getTruncatedTimestamp(cell.getKey()) : cell.getKey();
|
||||
cellResults.put(ts, value);
|
||||
}
|
||||
}
|
||||
results.put(converterColumnKey, cellResults);
|
||||
|
|
|
@ -347,16 +347,8 @@ public final class HBaseTimelineStorageUtils {
|
|||
public static void setMetricsTimeRange(Query query, byte[] metricsCf,
|
||||
long tsBegin, long tsEnd) {
|
||||
if (tsBegin != 0 || tsEnd != Long.MAX_VALUE) {
|
||||
long supplementedTsBegin = tsBegin == 0 ? 0 :
|
||||
TimestampGenerator.getSupplementedTimestamp(tsBegin, null);
|
||||
long supplementedTsEnd =
|
||||
(tsEnd == Long.MAX_VALUE) ? Long.MAX_VALUE :
|
||||
TimestampGenerator.getSupplementedTimestamp(tsEnd + 1, null);
|
||||
// Handle overflow by resetting time begin to 0 and time end to
|
||||
// Long#MAX_VALUE, if required.
|
||||
query.setColumnFamilyTimeRange(metricsCf,
|
||||
((supplementedTsBegin < 0) ? 0 : supplementedTsBegin),
|
||||
((supplementedTsEnd < 0) ? Long.MAX_VALUE : supplementedTsEnd));
|
||||
tsBegin, ((tsEnd == Long.MAX_VALUE) ? Long.MAX_VALUE : (tsEnd + 1)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,7 +69,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
|
|||
private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
|
||||
String columnPrefix, AggregationOperation fra, ValueConverter converter,
|
||||
boolean compoundColQual) {
|
||||
column = new ColumnHelper<FlowRunTable>(columnFamily, converter);
|
||||
column = new ColumnHelper<FlowRunTable>(columnFamily, converter, true);
|
||||
this.columnFamily = columnFamily;
|
||||
this.columnPrefix = columnPrefix;
|
||||
if (columnPrefix == null) {
|
||||
|
|
|
@ -362,8 +362,8 @@ class ApplicationEntityReader extends GenericEntityReader {
|
|||
|
||||
private void setMetricsTimeRange(Query query) {
|
||||
// Set time range for metric values.
|
||||
HBaseTimelineStorageUtils.
|
||||
setMetricsTimeRange(query, ApplicationColumnFamily.METRICS.getBytes(),
|
||||
HBaseTimelineStorageUtils.setMetricsTimeRange(
|
||||
query, ApplicationColumnFamily.METRICS.getBytes(),
|
||||
getDataToRetrieve().getMetricsTimeBegin(),
|
||||
getDataToRetrieve().getMetricsTimeEnd());
|
||||
}
|
||||
|
|
|
@ -471,8 +471,8 @@ class GenericEntityReader extends TimelineEntityReader {
|
|||
|
||||
private void setMetricsTimeRange(Query query) {
|
||||
// Set time range for metric values.
|
||||
HBaseTimelineStorageUtils.
|
||||
setMetricsTimeRange(query, EntityColumnFamily.METRICS.getBytes(),
|
||||
HBaseTimelineStorageUtils.setMetricsTimeRange(
|
||||
query, EntityColumnFamily.METRICS.getBytes(),
|
||||
getDataToRetrieve().getMetricsTimeBegin(),
|
||||
getDataToRetrieve().getMetricsTimeEnd());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue