diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java index 46e427e6c48..9f95d445271 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java @@ -52,11 +52,28 @@ public class ColumnHelper { private final ValueConverter converter; + private final boolean supplementTs; + public ColumnHelper(ColumnFamily columnFamily) { this(columnFamily, GenericConverter.getInstance()); } public ColumnHelper(ColumnFamily columnFamily, ValueConverter converter) { + this(columnFamily, converter, false); + } + + /** + * @param columnFamily column family implementation. + * @param converter converter use to encode/decode values stored in the column + * or column prefix. + * @param needSupplementTs flag to indicate if cell timestamp needs to be + * modified for this column by calling + * {@link TimestampGenerator#getSupplementedTimestamp(long, String)}. This + * would be required for columns(such as metrics in flow run table) where + * potential collisions can occur due to same timestamp. + */ + public ColumnHelper(ColumnFamily columnFamily, ValueConverter converter, + boolean needSupplementTs) { this.columnFamily = columnFamily; columnFamilyBytes = columnFamily.getBytes(); if (converter == null) { @@ -64,6 +81,7 @@ public class ColumnHelper { } else { this.converter = converter; } + this.supplementTs = needSupplementTs; } /** @@ -106,18 +124,24 @@ public class ColumnHelper { } /* - * Figures out the cell timestamp used in the Put For storing into flow run - * table. We would like to left shift the timestamp and supplement it with the - * AppId id so that there are no collisions in the flow run table's cells + * Figures out the cell timestamp used in the Put For storing. + * Will supplement the timestamp if required. Typically done for flow run + * table.If we supplement the timestamp, we left shift the timestamp and + * supplement it with the AppId id so that there are no collisions in the flow + * run table's cells. */ private long getPutTimestamp(Long timestamp, Attribute[] attributes) { if (timestamp == null) { timestamp = System.currentTimeMillis(); } - String appId = getAppIdFromAttributes(attributes); - long supplementedTS = TimestampGenerator.getSupplementedTimestamp( - timestamp, appId); - return supplementedTS; + if (!this.supplementTs) { + return timestamp; + } else { + String appId = getAppIdFromAttributes(attributes); + long supplementedTS = TimestampGenerator.getSupplementedTimestamp( + timestamp, appId); + return supplementedTS; + } } private String getAppIdFromAttributes(Attribute[] attributes) { @@ -234,9 +258,9 @@ public class ColumnHelper { for (Entry cell : cells.entrySet()) { V value = (V) converter.decodeValue(cell.getValue()); - cellResults.put( - TimestampGenerator.getTruncatedTimestamp(cell.getKey()), - value); + Long ts = supplementTs ? TimestampGenerator. + getTruncatedTimestamp(cell.getKey()) : cell.getKey(); + cellResults.put(ts, value); } } results.put(converterColumnKey, cellResults); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java index 97e70b81dd6..b0d85274e60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java @@ -347,16 +347,8 @@ public final class HBaseTimelineStorageUtils { public static void setMetricsTimeRange(Query query, byte[] metricsCf, long tsBegin, long tsEnd) { if (tsBegin != 0 || tsEnd != Long.MAX_VALUE) { - long supplementedTsBegin = tsBegin == 0 ? 0 : - TimestampGenerator.getSupplementedTimestamp(tsBegin, null); - long supplementedTsEnd = - (tsEnd == Long.MAX_VALUE) ? Long.MAX_VALUE : - TimestampGenerator.getSupplementedTimestamp(tsEnd + 1, null); - // Handle overflow by resetting time begin to 0 and time end to - // Long#MAX_VALUE, if required. query.setColumnFamilyTimeRange(metricsCf, - ((supplementedTsBegin < 0) ? 0 : supplementedTsBegin), - ((supplementedTsEnd < 0) ? Long.MAX_VALUE : supplementedTsEnd)); + tsBegin, ((tsEnd == Long.MAX_VALUE) ? Long.MAX_VALUE : (tsEnd + 1))); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index 103674e2f76..f521cd71844 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -69,7 +69,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix { private FlowRunColumnPrefix(ColumnFamily columnFamily, String columnPrefix, AggregationOperation fra, ValueConverter converter, boolean compoundColQual) { - column = new ColumnHelper(columnFamily, converter); + column = new ColumnHelper(columnFamily, converter, true); this.columnFamily = columnFamily; this.columnPrefix = columnPrefix; if (columnPrefix == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java index cda4510f411..0edd6a52680 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -362,10 +362,10 @@ class ApplicationEntityReader extends GenericEntityReader { private void setMetricsTimeRange(Query query) { // Set time range for metric values. - HBaseTimelineStorageUtils. - setMetricsTimeRange(query, ApplicationColumnFamily.METRICS.getBytes(), - getDataToRetrieve().getMetricsTimeBegin(), - getDataToRetrieve().getMetricsTimeEnd()); + HBaseTimelineStorageUtils.setMetricsTimeRange( + query, ApplicationColumnFamily.METRICS.getBytes(), + getDataToRetrieve().getMetricsTimeBegin(), + getDataToRetrieve().getMetricsTimeEnd()); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index 6b740e201ea..d7aca74fdc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -471,10 +471,10 @@ class GenericEntityReader extends TimelineEntityReader { private void setMetricsTimeRange(Query query) { // Set time range for metric values. - HBaseTimelineStorageUtils. - setMetricsTimeRange(query, EntityColumnFamily.METRICS.getBytes(), - getDataToRetrieve().getMetricsTimeBegin(), - getDataToRetrieve().getMetricsTimeEnd()); + HBaseTimelineStorageUtils.setMetricsTimeRange( + query, EntityColumnFamily.METRICS.getBytes(), + getDataToRetrieve().getMetricsTimeBegin(), + getDataToRetrieve().getMetricsTimeEnd()); } @Override