YARN-5925. Extract hbase-backend-exclusive utility methods from TimelineStorageUtil. Contributed by Haibo Chen.

This commit is contained in:
Sangjin Lee 2016-12-09 16:17:24 -08:00
parent 2a28e8cf04
commit 55f5886ea2
14 changed files with 309 additions and 269 deletions

View File

@ -52,7 +52,7 @@
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.junit.After;
import org.junit.AfterClass;
@ -78,7 +78,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
private static HBaseTestingUtility util;
private static long ts = System.currentTimeMillis();
private static long dayTs =
TimelineStorageUtils.getTopOfTheDayTimestamp(ts);
HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
@BeforeClass
public static void setup() throws Exception {
@ -984,7 +984,7 @@ public void testGetFlows() throws Exception {
assertEquals(1, entities.size());
long firstFlowActivity =
TimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L);
HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L);
DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get();
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +

View File

@ -52,7 +52,7 @@
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -172,7 +172,7 @@ public void testWriteFlowRunMinMax() throws Exception {
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName());
Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
assertEquals(1, values.size());
checkFlowActivityRunId(runid, flowVersion, values);
@ -303,7 +303,8 @@ private void checkFlowActivityTable(String cluster, String user, String flow,
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName());
Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(
appCreatedTime);
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
assertEquals(1, values.size());
checkFlowActivityRunId(runid, flowVersion, values);
@ -388,7 +389,7 @@ public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
assertEquals(user, flowActivity.getUser());
assertEquals(flow, flowActivity.getFlowName());
long dayTs =
TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
assertEquals(dayTs, flowActivity.getDate().getTime());
Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
assertEquals(3, flowRuns.size());
@ -443,7 +444,8 @@ private void checkFlowActivityTableSeveralRuns(String cluster, String user,
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName());
Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(
appCreatedTime);
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
Map<byte[], byte[]> values = result

View File

@ -62,7 +62,7 @@
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -109,8 +109,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
HRegionServer server = util.getRSForFirstRegionInTable(table);
List<Region> regions = server.getOnlineRegions(table);
for (Region region : regions) {
assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf));
assertTrue(HBaseTimelineStorageUtils.isFlowRunTable(
region.getRegionInfo(), hbaseConf));
}
}
@ -124,8 +124,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
HRegionServer server = util.getRSForFirstRegionInTable(table);
List<Region> regions = server.getOnlineRegions(table);
for (Region region : regions) {
assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf));
assertFalse(HBaseTimelineStorageUtils.isFlowRunTable(
region.getRegionInfo(), hbaseConf));
}
}
@ -139,8 +139,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
HRegionServer server = util.getRSForFirstRegionInTable(table);
List<Region> regions = server.getOnlineRegions(table);
for (Region region : regions) {
assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf));
assertFalse(HBaseTimelineStorageUtils.isFlowRunTable(
region.getRegionInfo(), hbaseConf));
}
}
}

View File

@ -54,8 +54,8 @@
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
import org.junit.AfterClass;
import org.junit.Assert;
@ -417,8 +417,8 @@ public void checkProcessSummationMoreCellsSumFinal2()
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
currentColumnCells.add(c1);
tags = new ArrayList<>();
@ -427,8 +427,8 @@ public void checkProcessSummationMoreCellsSumFinal2()
tags.add(t);
tagByteArray = Tag.fromList(tags);
// create a cell with a recent timestamp and attribute SUM_FINAL
Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
currentColumnCells.add(c2);
tags = new ArrayList<>();
@ -437,8 +437,8 @@ public void checkProcessSummationMoreCellsSumFinal2()
tags.add(t);
tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c3 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
Cell c3 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
currentColumnCells.add(c3);
tags = new ArrayList<>();
@ -447,8 +447,8 @@ public void checkProcessSummationMoreCellsSumFinal2()
tags.add(t);
tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c4 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
Cell c4 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
currentColumnCells.add(c4);
List<Cell> cells =
@ -517,7 +517,7 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException {
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsFinal++;
@ -531,7 +531,7 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException {
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with attribute SUM
c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsNotFinal++;
@ -608,7 +608,7 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags()
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsFinal++;
@ -622,7 +622,7 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags()
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsFinalNotExpire++;
@ -636,7 +636,7 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags()
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with attribute SUM
c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsNotFinal++;
@ -693,8 +693,8 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException {
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
120L, Bytes.toBytes(cellValue1), tagByteArray);
Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, 120L, Bytes.toBytes(cellValue1), tagByteArray);
currentColumnCells.add(c1);
tags = new ArrayList<>();
@ -704,8 +704,8 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException {
tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
130L, Bytes.toBytes(cellValue2), tagByteArray);
Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, 130L, Bytes.toBytes(cellValue2), tagByteArray);
currentColumnCells.add(c2);
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
new LongConverter(), currentTimestamp);
@ -751,8 +751,8 @@ public void testProcessSummationOneCellSumFinal() throws IOException {
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp
Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
120L, Bytes.toBytes(1110L), tagByteArray);
Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, 120L, Bytes.toBytes(1110L), tagByteArray);
currentColumnCells.add(c1);
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
@ -789,8 +789,8 @@ public void testProcessSummationOneCell() throws IOException {
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
currentTimestamp, Bytes.toBytes(1110L), tagByteArray);
Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, currentTimestamp, Bytes.toBytes(1110L), tagByteArray);
currentColumnCells.add(c1);
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
new LongConverter(), currentTimestamp);

View File

@ -54,7 +54,8 @@ public byte[] encode(String appIdStr) {
byte[] clusterTs = Bytes.toBytes(
LongConverter.invertLong(appId.getClusterTimestamp()));
System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG);
byte[] seqId = Bytes.toBytes(TimelineStorageUtils.invertInt(appId.getId()));
byte[] seqId = Bytes.toBytes(
HBaseTimelineStorageUtils.invertInt(appId.getId()));
System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT);
return appIdBytes;
}
@ -79,7 +80,7 @@ public String decode(byte[] appIdBytes) {
}
long clusterTs = LongConverter.invertLong(
Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG));
int seqId = TimelineStorageUtils.invertInt(
int seqId = HBaseTimelineStorageUtils.invertInt(
Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT));
return ApplicationId.newInstance(clusterTs, seqId).toString();
}

View File

@ -0,0 +1,243 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* A bunch of utility functions used in HBase TimelineService backend.
*/
public final class HBaseTimelineStorageUtils {
/** milliseconds in one day. */
public static final long MILLIS_ONE_DAY = 86400000L;
private static final Log LOG =
LogFactory.getLog(HBaseTimelineStorageUtils.class);
private HBaseTimelineStorageUtils() {
}
/**
* Combines the input array of attributes and the input aggregation operation
* into a new array of attributes.
*
* @param attributes Attributes to be combined.
* @param aggOp Aggregation operation.
* @return array of combined attributes.
*/
public static Attribute[] combineAttributes(Attribute[] attributes,
AggregationOperation aggOp) {
int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
Attribute[] combinedAttributes = new Attribute[newLength];
if (attributes != null) {
System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
}
if (aggOp != null) {
Attribute a2 = aggOp.getAttribute();
combinedAttributes[newLength - 1] = a2;
}
return combinedAttributes;
}
/**
* Returns a number for the new array size. The new array is the combination
* of input array of attributes and the input aggregation operation.
*
* @param attributes Attributes.
* @param aggOp Aggregation operation.
* @return the size for the new array
*/
private static int getNewLengthCombinedAttributes(Attribute[] attributes,
AggregationOperation aggOp) {
int oldLength = getAttributesLength(attributes);
int aggLength = getAppOpLength(aggOp);
return oldLength + aggLength;
}
private static int getAppOpLength(AggregationOperation aggOp) {
if (aggOp != null) {
return 1;
}
return 0;
}
private static int getAttributesLength(Attribute[] attributes) {
if (attributes != null) {
return attributes.length;
}
return 0;
}
/**
* Returns the first seen aggregation operation as seen in the list of input
* tags or null otherwise.
*
* @param tags list of HBase tags.
* @return AggregationOperation
*/
public static AggregationOperation getAggregationOperationFromTagsList(
List<Tag> tags) {
for (AggregationOperation aggOp : AggregationOperation.values()) {
for (Tag tag : tags) {
if (tag.getType() == aggOp.getTagType()) {
return aggOp;
}
}
}
return null;
}
/**
* Creates a {@link Tag} from the input attribute.
*
* @param attribute Attribute from which tag has to be fetched.
* @return a HBase Tag.
*/
public static Tag getTagFromAttribute(Map.Entry<String, byte[]> attribute) {
// attribute could be either an Aggregation Operation or
// an Aggregation Dimension
// Get the Tag type from either
AggregationOperation aggOp = AggregationOperation
.getAggregationOperation(attribute.getKey());
if (aggOp != null) {
Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
return t;
}
AggregationCompactionDimension aggCompactDim =
AggregationCompactionDimension.getAggregationCompactionDimension(
attribute.getKey());
if (aggCompactDim != null) {
Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
return t;
}
return null;
}
/**
* creates a new cell based on the input cell but with the new value.
*
* @param origCell Original cell
* @param newValue new cell value
* @return cell
* @throws IOException while creating new cell.
*/
public static Cell createNewCell(Cell origCell, byte[] newValue)
throws IOException {
return CellUtil.createCell(CellUtil.cloneRow(origCell),
CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
}
/**
* creates a cell with the given inputs.
*
* @param row row of the cell to be created
* @param family column family name of the new cell
* @param qualifier qualifier for the new cell
* @param ts timestamp of the new cell
* @param newValue value of the new cell
* @param tags tags in the new cell
* @return cell
* @throws IOException while creating the cell.
*/
public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
long ts, byte[] newValue, byte[] tags) throws IOException {
return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
newValue, tags);
}
/**
* returns app id from the list of tags.
*
* @param tags cell tags to be looked into
* @return App Id as the AggregationCompactionDimension
*/
public static String getAggregationCompactionDimension(List<Tag> tags) {
String appId = null;
for (Tag t : tags) {
if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
.getType()) {
appId = Bytes.toString(t.getValue());
return appId;
}
}
return appId;
}
public static boolean isFlowRunTable(HRegionInfo hRegionInfo,
Configuration conf) {
String regionTableName = hRegionInfo.getTable().getNameAsString();
String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME,
FlowRunTable.DEFAULT_TABLE_NAME);
if (HBaseTimelineStorageUtils.LOG.isDebugEnabled()) {
HBaseTimelineStorageUtils.LOG.debug("regionTableName=" + regionTableName);
}
if (flowRunTableName.equalsIgnoreCase(regionTableName)) {
if (HBaseTimelineStorageUtils.LOG.isDebugEnabled()) {
HBaseTimelineStorageUtils.LOG.debug(
"table is the flow run table!! " + flowRunTableName);
}
return true;
}
return false;
}
/**
* Converts an int into it's inverse int to be used in (row) keys
* where we want to have the largest int value in the top of the table
* (scans start at the largest int first).
*
* @param key value to be inverted so that the latest version will be first in
* a scan.
* @return inverted int
*/
public static int invertInt(int key) {
return Integer.MAX_VALUE - key;
}
/**
* returns the timestamp of that day's start (which is midnight 00:00:00 AM)
* for a given input timestamp.
*
* @param ts Timestamp.
* @return timestamp of that day's beginning (midnight)
*/
public static long getTopOfTheDayTimestamp(long ts) {
long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
return dayTimestamp;
}
}

View File

@ -20,22 +20,13 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
@ -47,10 +38,6 @@
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
/**
* A bunch of utility functions used across TimelineReader and TimelineWriter.
@ -63,133 +50,6 @@ private TimelineStorageUtils() {
private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class);
/** milliseconds in one day. */
public static final long MILLIS_ONE_DAY = 86400000L;
/**
* Converts an int into it's inverse int to be used in (row) keys
* where we want to have the largest int value in the top of the table
* (scans start at the largest int first).
*
* @param key value to be inverted so that the latest version will be first in
* a scan.
* @return inverted int
*/
public static int invertInt(int key) {
return Integer.MAX_VALUE - key;
}
/**
* returns the timestamp of that day's start (which is midnight 00:00:00 AM)
* for a given input timestamp.
*
* @param ts Timestamp.
* @return timestamp of that day's beginning (midnight)
*/
public static long getTopOfTheDayTimestamp(long ts) {
long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
return dayTimestamp;
}
/**
* Combines the input array of attributes and the input aggregation operation
* into a new array of attributes.
*
* @param attributes Attributes to be combined.
* @param aggOp Aggregation operation.
* @return array of combined attributes.
*/
public static Attribute[] combineAttributes(Attribute[] attributes,
AggregationOperation aggOp) {
int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
Attribute[] combinedAttributes = new Attribute[newLength];
if (attributes != null) {
System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
}
if (aggOp != null) {
Attribute a2 = aggOp.getAttribute();
combinedAttributes[newLength - 1] = a2;
}
return combinedAttributes;
}
/**
* Returns a number for the new array size. The new array is the combination
* of input array of attributes and the input aggregation operation.
*
* @param attributes Attributes.
* @param aggOp Aggregation operation.
* @return the size for the new array
*/
private static int getNewLengthCombinedAttributes(Attribute[] attributes,
AggregationOperation aggOp) {
int oldLength = getAttributesLength(attributes);
int aggLength = getAppOpLength(aggOp);
return oldLength + aggLength;
}
private static int getAppOpLength(AggregationOperation aggOp) {
if (aggOp != null) {
return 1;
}
return 0;
}
private static int getAttributesLength(Attribute[] attributes) {
if (attributes != null) {
return attributes.length;
}
return 0;
}
/**
* Returns the first seen aggregation operation as seen in the list of input
* tags or null otherwise.
*
* @param tags list of HBase tags.
* @return AggregationOperation
*/
public static AggregationOperation getAggregationOperationFromTagsList(
List<Tag> tags) {
for (AggregationOperation aggOp : AggregationOperation.values()) {
for (Tag tag : tags) {
if (tag.getType() == aggOp.getTagType()) {
return aggOp;
}
}
}
return null;
}
/**
* Creates a {@link Tag} from the input attribute.
*
* @param attribute Attribute from which tag has to be fetched.
* @return a HBase Tag.
*/
public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
// attribute could be either an Aggregation Operation or
// an Aggregation Dimension
// Get the Tag type from either
AggregationOperation aggOp = AggregationOperation
.getAggregationOperation(attribute.getKey());
if (aggOp != null) {
Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
return t;
}
AggregationCompactionDimension aggCompactDim =
AggregationCompactionDimension.getAggregationCompactionDimension(
attribute.getKey());
if (aggCompactDim != null) {
Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
return t;
}
return null;
}
/**
* Matches key-values filter. Used for relatesTo/isRelatedTo filters.
*
@ -516,71 +376,4 @@ public static boolean isIntegralValue(Object obj) {
(obj instanceof Long);
}
/**
* creates a new cell based on the input cell but with the new value.
*
* @param origCell Original cell
* @param newValue new cell value
* @return cell
* @throws IOException while creating new cell.
*/
public static Cell createNewCell(Cell origCell, byte[] newValue)
throws IOException {
return CellUtil.createCell(CellUtil.cloneRow(origCell),
CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
}
/**
* creates a cell with the given inputs.
*
* @param row row of the cell to be created
* @param family column family name of the new cell
* @param qualifier qualifier for the new cell
* @param ts timestamp of the new cell
* @param newValue value of the new cell
* @param tags tags in the new cell
* @return cell
* @throws IOException while creating the cell.
*/
public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
long ts, byte[] newValue, byte[] tags) throws IOException {
return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
newValue, tags);
}
/**
* returns app id from the list of tags.
*
* @param tags cell tags to be looked into
* @return App Id as the AggregationCompactionDimension
*/
public static String getAggregationCompactionDimension(List<Tag> tags) {
String appId = null;
for (Tag t : tags) {
if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
.getType()) {
appId = Bytes.toString(t.getValue());
return appId;
}
}
return appId;
}
public static boolean isFlowRunTable(HRegionInfo hRegionInfo,
Configuration conf) {
String regionTableName = hRegionInfo.getTable().getNameAsString();
String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME,
FlowRunTable.DEFAULT_TABLE_NAME);
if (LOG.isDebugEnabled()) {
LOG.debug("regionTableName=" + regionTableName);
}
if (flowRunTableName.equalsIgnoreCase(regionTableName)) {
if (LOG.isDebugEnabled()) {
LOG.debug(" table is the flow run table!! " + flowRunTableName);
}
return true;
}
return false;
}
}

View File

@ -26,9 +26,9 @@
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
@ -144,8 +144,8 @@ public void store(byte[] rowKey,
}
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, this.aggOp);
Attribute[] combinedAttributes =
HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
}
@ -269,8 +269,8 @@ public void store(byte[] rowKey,
}
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, this.aggOp);
Attribute[] combinedAttributes =
HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
combinedAttributes);
}

View File

@ -18,10 +18,10 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
* Represents a rowkey for the flow activity table.
@ -59,7 +59,7 @@ protected FlowActivityRowKey(String clusterId, Long timestamp, String userId,
String flowName, boolean convertDayTsToTopOfDay) {
this.clusterId = clusterId;
if (convertDayTsToTopOfDay && (timestamp != null)) {
this.dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(timestamp);
this.dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(timestamp);
} else {
this.dayTs = timestamp;
}

View File

@ -25,9 +25,9 @@
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
@ -113,8 +113,8 @@ public void store(byte[] rowKey,
TypedBufferedMutator<FlowRunTable> tableMutator, Long timestamp,
Object inputValue, Attribute... attributes) throws IOException {
Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, aggOp);
Attribute[] combinedAttributes =
HBaseTimelineStorageUtils.combineAttributes(attributes, aggOp);
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
inputValue, combinedAttributes);
}

View File

@ -26,10 +26,10 @@
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
@ -136,7 +136,7 @@ public void store(byte[] rowKey,
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes =
TimelineStorageUtils.combineAttributes(attributes, this.aggOp);
HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
}
@ -163,7 +163,7 @@ public void store(byte[] rowKey,
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes =
TimelineStorageUtils.combineAttributes(attributes, this.aggOp);
HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
}

View File

@ -48,7 +48,7 @@
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
/**
@ -71,7 +71,7 @@ public void start(CoprocessorEnvironment e) throws IOException {
if (e instanceof RegionCoprocessorEnvironment) {
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
this.region = env.getRegion();
isFlowRunRegion = TimelineStorageUtils.isFlowRunTable(
isFlowRunRegion = HBaseTimelineStorageUtils.isFlowRunTable(
region.getRegionInfo(), env.getConfiguration());
}
}
@ -107,7 +107,7 @@ public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,
List<Tag> tags = new ArrayList<>();
if ((attributes != null) && (attributes.size() > 0)) {
for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
Tag t = TimelineStorageUtils.getTagFromAttribute(attribute);
Tag t = HBaseTimelineStorageUtils.getTagFromAttribute(attribute);
tags.add(t);
}
byte[] tagByteArray = Tag.fromList(tags);

View File

@ -45,9 +45,9 @@
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
@ -249,7 +249,7 @@ private AggregationOperation getCurrentAggOp(Cell cell) {
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
// We assume that all the operations for a particular column are the same
return TimelineStorageUtils.getAggregationOperationFromTagsList(tags);
return HBaseTimelineStorageUtils.getAggregationOperationFromTagsList(tags);
}
/**
@ -323,7 +323,7 @@ private void collectCells(SortedSet<Cell> currentColumnCells,
// only if this app has not been seen yet, add to current column cells
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
String aggDim = TimelineStorageUtils
String aggDim = HBaseTimelineStorageUtils
.getAggregationCompactionDimension(tags);
if (!alreadySeenAggDim.contains(aggDim)) {
// if this agg dimension has already been seen,
@ -418,7 +418,8 @@ private Cell processSummation(SortedSet<Cell> currentColumnCells,
sum = converter.add(sum, currentValue);
}
byte[] sumBytes = converter.encodeValue(sum);
Cell sumCell = TimelineStorageUtils.createNewCell(mostRecentCell, sumBytes);
Cell sumCell =
HBaseTimelineStorageUtils.createNewCell(mostRecentCell, sumBytes);
return sumCell;
}
@ -460,7 +461,7 @@ List<Cell> processSummationMajorCompaction(
// if this is the existing flow sum cell
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
String appId = TimelineStorageUtils
String appId = HBaseTimelineStorageUtils
.getAggregationCompactionDimension(tags);
if (appId == FLOW_APP_ID) {
sum = converter.add(sum, currentValue);
@ -502,7 +503,7 @@ List<Cell> processSummationMajorCompaction(
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
Cell sumCell = TimelineStorageUtils.createNewCell(
Cell sumCell = HBaseTimelineStorageUtils.createNewCell(
CellUtil.cloneRow(anyCell),
CellUtil.cloneFamily(anyCell),
CellUtil.cloneQualifier(anyCell),

View File

@ -185,7 +185,7 @@ public void testEntityRowKey() {
@Test
public void testFlowActivityRowKey() {
Long ts = 1459900830000L;
Long dayTimestamp = TimelineStorageUtils.getTopOfTheDayTimestamp(ts);
Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
byte[] byteRowKey =
new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME).getRowKey();
FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(byteRowKey);