YARN-5109. timestamps are stored unencoded causing parse errors (Varun Saxena via sjlee)

This commit is contained in:
Sangjin Lee 2016-05-26 21:39:16 -07:00
parent d729e8211b
commit 7b8cfa5c2f
34 changed files with 1991 additions and 804 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@ -50,25 +50,28 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@ -482,7 +485,6 @@ public class TestHBaseTimelineStorage {
}
}
@Test
public void testWriteNullApplicationToHBase() throws Exception {
TimelineEntities te = new TimelineEntities();
@ -494,7 +496,7 @@ public class TestHBaseTimelineStorage {
// add the info map in Timeline Entity
Map<String, Object> infoMap = new HashMap<String, Object>();
infoMap.put("infoMapKey1", "infoMapValue1");
infoMap.put("in fo M apK ey1", "infoMapValue1");
infoMap.put("infoMapKey2", 10);
entity.addInfo(infoMap);
@ -517,6 +519,7 @@ public class TestHBaseTimelineStorage {
// retrieve the row
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(cluster));
scan.setStopRow(Bytes.toBytes(cluster + "1"));
Connection conn = ConnectionFactory.createConnection(c1);
ResultScanner resultScanner = new ApplicationTable()
.getResultScanner(c1, conn, scan);
@ -626,7 +629,7 @@ public class TestHBaseTimelineStorage {
hbi.start();
String cluster = "cluster_test_write_app";
String user = "user1";
String flow = "some_flow_name";
String flow = "s!ome_f\tlow _n am!e";
String flowVersion = "AB7822C10F1111";
long runid = 1002345678919L;
hbi.write(cluster, user, flow, flowVersion, runid, appId, te);
@ -670,7 +673,8 @@ public class TestHBaseTimelineStorage {
assertEquals(cTime, cTime1);
Map<String, Object> infoColumns =
ApplicationColumnPrefix.INFO.readResults(result);
ApplicationColumnPrefix.INFO.readResults(result,
StringKeyConverter.getInstance());
assertEquals(infoMap, infoColumns);
// Remember isRelatedTo is of type Map<String, Set<String>>
@ -706,11 +710,13 @@ public class TestHBaseTimelineStorage {
// Configuration
Map<String, Object> configColumns =
ApplicationColumnPrefix.CONFIG.readResults(result);
ApplicationColumnPrefix.CONFIG.readResults(result,
StringKeyConverter.getInstance());
assertEquals(conf, configColumns);
NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(
result, StringKeyConverter.getInstance());
NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
matchMetrics(metricValues, metricMap);
@ -868,7 +874,8 @@ public class TestHBaseTimelineStorage {
assertEquals(cTime1, cTime);
Map<String, Object> infoColumns =
EntityColumnPrefix.INFO.readResults(result);
EntityColumnPrefix.INFO.readResults(result,
StringKeyConverter.getInstance());
assertEquals(infoMap, infoColumns);
// Remember isRelatedTo is of type Map<String, Set<String>>
@ -906,11 +913,12 @@ public class TestHBaseTimelineStorage {
// Configuration
Map<String, Object> configColumns =
EntityColumnPrefix.CONFIG.readResults(result);
EntityColumnPrefix.CONFIG.readResults(result, StringKeyConverter.getInstance());
assertEquals(conf, configColumns);
NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
EntityColumnPrefix.METRIC.readResultsWithTimestamps(
result, StringKeyConverter.getInstance());
NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
matchMetrics(metricValues, metricMap);
@ -963,7 +971,7 @@ public class TestHBaseTimelineStorage {
}
private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
String flow, long runid, String appName, TimelineEntity te) {
String flow, Long runid, String appName, TimelineEntity te) {
EntityRowKey key = EntityRowKey.parseRowKey(rowKey);
@ -978,7 +986,7 @@ public class TestHBaseTimelineStorage {
}
private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster,
String user, String flow, long runid, String appName) {
String user, String flow, Long runid, String appName) {
ApplicationRowKey key = ApplicationRowKey.parseRowKey(rowKey);
@ -995,7 +1003,7 @@ public class TestHBaseTimelineStorage {
TimelineEvent event = new TimelineEvent();
String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE;
event.setId(eventId);
long expTs = 1436512802000L;
Long expTs = 1436512802000L;
event.setTimestamp(expTs);
String expKey = "foo_event";
Object expVal = "test";
@ -1038,20 +1046,18 @@ public class TestHBaseTimelineStorage {
assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
appName));
Map<?, Object> eventsResult =
ApplicationColumnPrefix.EVENT.
readResultsHavingCompoundColumnQualifiers(result);
Map<EventColumnName, Object> eventsResult =
ApplicationColumnPrefix.EVENT.readResults(result,
EventColumnNameConverter.getInstance());
// there should be only one event
assertEquals(1, eventsResult.size());
for (Map.Entry<?, Object> e : eventsResult.entrySet()) {
for (Map.Entry<EventColumnName, Object> e : eventsResult.entrySet()) {
EventColumnName eventColumnName = e.getKey();
// the qualifier is a compound key
// hence match individual values
byte[][] karr = (byte[][])e.getKey();
assertEquals(3, karr.length);
assertEquals(eventId, Bytes.toString(karr[0]));
assertEquals(
TimelineStorageUtils.invertLong(expTs), Bytes.toLong(karr[1]));
assertEquals(expKey, Bytes.toString(karr[2]));
assertEquals(eventId, eventColumnName.getId());
assertEquals(expTs, eventColumnName.getTimestamp());
assertEquals(expKey, eventColumnName.getInfoKey());
Object value = e.getValue();
// there should be only one timestamp and value
assertEquals(expVal, value.toString());
@ -1076,7 +1082,7 @@ public class TestHBaseTimelineStorage {
assertEquals(1, events.size());
for (TimelineEvent e : events) {
assertEquals(eventId, e.getId());
assertEquals(expTs, e.getTimestamp());
assertEquals(expTs, Long.valueOf(e.getTimestamp()));
Map<String,Object> info = e.getInfo();
assertEquals(1, info.size());
for (Map.Entry<String, Object> infoEntry : info.entrySet()) {
@ -1095,9 +1101,9 @@ public class TestHBaseTimelineStorage {
@Test
public void testEventsWithEmptyInfo() throws IOException {
TimelineEvent event = new TimelineEvent();
String eventId = "foo_event_id";
String eventId = "foo_ev e nt_id";
event.setId(eventId);
long expTs = 1436512802000L;
Long expTs = 1436512802000L;
event.setTimestamp(expTs);
final TimelineEntity entity = new TimelineEntity();
@ -1142,21 +1148,19 @@ public class TestHBaseTimelineStorage {
assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
entity));
Map<?, Object> eventsResult =
EntityColumnPrefix.EVENT.
readResultsHavingCompoundColumnQualifiers(result);
Map<EventColumnName, Object> eventsResult =
EntityColumnPrefix.EVENT.readResults(result,
EventColumnNameConverter.getInstance());
// there should be only one event
assertEquals(1, eventsResult.size());
for (Map.Entry<?, Object> e : eventsResult.entrySet()) {
for (Map.Entry<EventColumnName, Object> e : eventsResult.entrySet()) {
EventColumnName eventColumnName = e.getKey();
// the qualifier is a compound key
// hence match individual values
byte[][] karr = (byte[][])e.getKey();
assertEquals(3, karr.length);
assertEquals(eventId, Bytes.toString(karr[0]));
assertEquals(TimelineStorageUtils.invertLong(expTs),
Bytes.toLong(karr[1]));
assertEquals(eventId, eventColumnName.getId());
assertEquals(expTs,eventColumnName.getTimestamp());
// key must be empty
assertEquals(0, karr[2].length);
assertNull(eventColumnName.getInfoKey());
Object value = e.getValue();
// value should be empty
assertEquals("", value.toString());
@ -1184,7 +1188,7 @@ public class TestHBaseTimelineStorage {
assertEquals(1, events.size());
for (TimelineEvent e : events) {
assertEquals(eventId, e.getId());
assertEquals(expTs, e.getTimestamp());
assertEquals(expTs, Long.valueOf(e.getTimestamp()));
Map<String,Object> info = e.getInfo();
assertTrue(info == null || info.isEmpty());
}
@ -1194,6 +1198,67 @@ public class TestHBaseTimelineStorage {
}
}
@Test
public void testEventsEscapeTs() throws IOException {
TimelineEvent event = new TimelineEvent();
String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE;
event.setId(eventId);
long expTs = 1463567041056L;
event.setTimestamp(expTs);
String expKey = "f==o o_e ve\tnt";
Object expVal = "test";
event.addInfo(expKey, expVal);
final TimelineEntity entity = new ApplicationEntity();
entity.setId(ApplicationId.newInstance(0, 1).toString());
entity.addEvent(event);
TimelineEntities entities = new TimelineEntities();
entities.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
try {
Configuration c1 = util.getConfiguration();
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
hbi.start();
String cluster = "clus!ter_\ttest_ev ents";
String user = "user2";
String flow = "other_flow_name";
String flowVersion = "1111F01C2287BA";
long runid = 1009876543218L;
String appName = "application_123465899910_2001";
hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
hbi.stop();
// read the timeline entity using the reader this time
TimelineEntity e1 = reader.getEntity(
new TimelineReaderContext(cluster, user, flow, runid, appName,
entity.getType(), entity.getId()),
new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
assertNotNull(e1);
// check the events
NavigableSet<TimelineEvent> events = e1.getEvents();
// there should be only one event
assertEquals(1, events.size());
for (TimelineEvent e : events) {
assertEquals(eventId, e.getId());
assertEquals(expTs, e.getTimestamp());
Map<String,Object> info = e.getInfo();
assertEquals(1, info.size());
for (Map.Entry<String, Object> infoEntry : info.entrySet()) {
assertEquals(expKey, infoEntry.getKey());
assertEquals(expVal, infoEntry.getValue());
}
}
} finally {
if (hbi != null) {
hbi.stop();
hbi.close();
}
}
}
@Test
public void testNonIntegralMetricValues() throws IOException {
TimelineEntities teApp = new TimelineEntities();

View File

@ -170,7 +170,7 @@ public class TestHBaseStorageFlowActivity {
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName());
long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
assertEquals(1, values.size());
checkFlowActivityRunId(runid, flowVersion, values);
@ -194,7 +194,7 @@ public class TestHBaseStorageFlowActivity {
assertEquals(cluster, flowActivity.getCluster());
assertEquals(user, flowActivity.getUser());
assertEquals(flow, flowActivity.getFlowName());
assertEquals(dayTs, flowActivity.getDate().getTime());
assertEquals(dayTs, Long.valueOf(flowActivity.getDate().getTime()));
Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
assertEquals(1, flowRuns.size());
}
@ -294,7 +294,7 @@ public class TestHBaseStorageFlowActivity {
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName());
long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
assertEquals(1, values.size());
checkFlowActivityRunId(runid, flowVersion, values);
@ -429,7 +429,7 @@ public class TestHBaseStorageFlowActivity {
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowName());
long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
Map<byte[], byte[]> values = result

View File

@ -31,10 +31,14 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
@ -205,6 +209,17 @@ public final class TimelineFilterUtils {
return singleColValFilter;
}
private static <T> byte[] createColQualifierPrefix(ColumnPrefix<T> colPrefix,
String column) {
if (colPrefix == ApplicationColumnPrefix.EVENT ||
colPrefix == EntityColumnPrefix.EVENT) {
return EventColumnNameConverter.getInstance().encode(
new EventColumnName(column, null, null));
} else {
return StringKeyConverter.getInstance().encode(column);
}
}
/**
* Create a filter list of qualifier filters based on passed set of columns.
*
@ -219,8 +234,7 @@ public final class TimelineFilterUtils {
for (String column : columns) {
// For columns which have compound column qualifiers (eg. events), we need
// to include the required separator.
byte[] compoundColQual =
colPrefix.getCompoundColQualBytes(column, (byte[])null);
byte[] compoundColQual = createColQualifierPrefix(colPrefix, column);
list.addFilter(new QualifierFilter(CompareOp.EQUAL,
new BinaryPrefixComparator(
colPrefix.getColumnPrefixBytes(compoundColQual))));

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@ -37,7 +36,6 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
@ -46,7 +44,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
@ -194,7 +196,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
long activityTimeStamp) throws IOException {
byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, activityTimeStamp,
userId, flowName);
byte[] qualifier = GenericObjectMapper.write(flowRunId);
byte[] qualifier = LongKeyConverter.getInstance().encode(flowRunId);
FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
null, flowVersion,
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
@ -278,7 +280,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
Attribute... attributes) throws IOException {
for (TimelineMetric metric : metrics) {
String metricColumnQualifier = metric.getId();
byte[] metricColumnQualifier =
StringKeyConverter.getInstance().encode(metric.getId());
Map<Long, Number> timeseries = metric.getValues();
for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
Long timestamp = timeseriesEntry.getKey();
@ -316,8 +319,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
// id3?id4?id5
String compoundValue =
Separator.VALUES.joinEncoded(connectedEntity.getValue());
columnPrefix.store(rowKey, table, connectedEntity.getKey(), null,
compoundValue);
columnPrefix.store(rowKey, table,
StringKeyConverter.getInstance().encode(connectedEntity.getKey()),
null, compoundValue);
}
}
@ -337,7 +341,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
if (info != null) {
for (Map.Entry<String, Object> entry : info.entrySet()) {
ApplicationColumnPrefix.INFO.store(rowKey, applicationTable,
entry.getKey(), null, entry.getValue());
StringKeyConverter.getInstance().encode(entry.getKey()), null,
entry.getValue());
}
}
} else {
@ -349,8 +354,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
Map<String, Object> info = te.getInfo();
if (info != null) {
for (Map.Entry<String, Object> entry : info.entrySet()) {
EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(),
null, entry.getValue());
EntityColumnPrefix.INFO.store(rowKey, entityTable,
StringKeyConverter.getInstance().encode(entry.getKey()), null,
entry.getValue());
}
}
}
@ -365,11 +371,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
return;
}
for (Map.Entry<String, String> entry : config.entrySet()) {
byte[] configKey =
StringKeyConverter.getInstance().encode(entry.getKey());
if (isApplication) {
ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable,
entry.getKey(), null, entry.getValue());
configKey, null, entry.getValue());
} else {
EntityColumnPrefix.CONFIG.store(rowKey, entityTable, entry.getKey(),
EntityColumnPrefix.CONFIG.store(rowKey, entityTable, configKey,
null, entry.getValue());
}
}
@ -383,7 +391,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
boolean isApplication) throws IOException {
if (metrics != null) {
for (TimelineMetric metric : metrics) {
String metricColumnQualifier = metric.getId();
byte[] metricColumnQualifier =
StringKeyConverter.getInstance().encode(metric.getId());
Map<Long, Number> timeseries = metric.getValues();
for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
Long timestamp = timeseriesEntry.getKey();
@ -416,41 +425,31 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
"! Using the current timestamp");
eventTimestamp = System.currentTimeMillis();
}
byte[] eventTs =
Bytes.toBytes(TimelineStorageUtils.invertLong(eventTimestamp));
EventColumnNameConverter converter =
EventColumnNameConverter.getInstance();
Map<String, Object> eventInfo = event.getInfo();
if ((eventInfo == null) || (eventInfo.size() == 0)) {
byte[] columnQualifierBytes = converter.encode(
new EventColumnName(eventId, eventTimestamp, null));
if (isApplication) {
byte[] compoundColumnQualifierBytes =
ApplicationColumnPrefix.EVENT.
getCompoundColQualBytes(eventId, eventTs, null);
ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
compoundColumnQualifierBytes, null,
TimelineStorageUtils.EMPTY_BYTES);
columnQualifierBytes, null, Separator.EMPTY_BYTES);
} else {
byte[] compoundColumnQualifierBytes =
EntityColumnPrefix.EVENT.
getCompoundColQualBytes(eventId, eventTs, null);
EntityColumnPrefix.EVENT.store(rowKey, entityTable,
compoundColumnQualifierBytes, null,
TimelineStorageUtils.EMPTY_BYTES);
columnQualifierBytes, null, Separator.EMPTY_BYTES);
}
} else {
for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
// eventId?infoKey
byte[] infoKey = Bytes.toBytes(info.getKey());
// eventId=infoKey
byte[] columnQualifierBytes = converter.encode(
new EventColumnName(eventId, eventTimestamp,
info.getKey()));
if (isApplication) {
byte[] compoundColumnQualifierBytes =
ApplicationColumnPrefix.EVENT.
getCompoundColQualBytes(eventId, eventTs, infoKey);
ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
compoundColumnQualifierBytes, null, info.getValue());
columnQualifierBytes, null, info.getValue());
} else {
byte[] compoundColumnQualifierBytes =
EntityColumnPrefix.EVENT.
getCompoundColQualBytes(eventId, eventTs, infoKey);
EntityColumnPrefix.EVENT.store(rowKey, entityTable,
compoundColumnQualifierBytes, null, info.getValue());
columnQualifierBytes, null, info.getValue());
}
} // for info: eventInfo
}

View File

@ -27,9 +27,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
@ -56,7 +57,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
/**
* Lifecycle events for an application.
*/
EVENT(ApplicationColumnFamily.INFO, "e", true),
EVENT(ApplicationColumnFamily.INFO, "e"),
/**
* Config column stores configuration with config key as the column name.
@ -78,7 +79,6 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
*/
private final String columnPrefix;
private final byte[] columnPrefixBytes;
private final boolean compoundColQual;
/**
* Private constructor, meant to be used by the enum definition.
@ -88,18 +88,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
*/
private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
String columnPrefix) {
this(columnFamily, columnPrefix, false, GenericConverter.getInstance());
}
private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
String columnPrefix, boolean compoundColQual) {
this(columnFamily, columnPrefix, compoundColQual,
GenericConverter.getInstance());
}
private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
String columnPrefix, ValueConverter converter) {
this(columnFamily, columnPrefix, false, converter);
this(columnFamily, columnPrefix, GenericConverter.getInstance());
}
/**
@ -111,7 +100,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
* this column prefix.
*/
private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
String columnPrefix, boolean compoundColQual, ValueConverter converter) {
String columnPrefix, ValueConverter converter) {
column = new ColumnHelper<ApplicationTable>(columnFamily, converter);
this.columnFamily = columnFamily;
this.columnPrefix = columnPrefix;
@ -122,7 +111,6 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
this.columnPrefixBytes =
Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
}
this.compoundColQual = compoundColQual;
}
/**
@ -149,15 +137,6 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
return columnFamily.getBytes();
}
@Override
public byte[] getCompoundColQualBytes(String qualifier,
byte[]...components) {
if (!compoundColQual) {
return ColumnHelper.getColumnQualifier(null, qualifier);
}
return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
}
/*
* (non-Javadoc)
*
@ -232,25 +211,12 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResults(org.apache.hadoop.hbase.client.Result)
* #readResults(org.apache.hadoop.hbase.client.Result,
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
*/
public Map<String, Object> readResults(Result result) throws IOException {
return column.readResults(result, columnPrefixBytes);
}
/**
* @param result from which to read columns
* @return the latest values of columns in the column family. The column
* qualifier is returned as a list of parts, each part a byte[]. This
* is to facilitate returning byte arrays of values that were not
* Strings. If they can be treated as Strings, you should use
* {@link #readResults(Result)} instead.
* @throws IOException if any problem occurs while reading results.
*/
public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
throws IOException {
return column.readResultsHavingCompoundColumnQualifiers(result,
columnPrefixBytes);
public <K> Map<K, Object> readResults(Result result,
KeyConverter<K> keyConverter) throws IOException {
return column.readResults(result, columnPrefixBytes, keyConverter);
}
/*
@ -258,11 +224,14 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
* #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
*/
public <V> NavigableMap<String, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result) throws IOException {
return column.readResultsWithTimestamps(result, columnPrefixBytes);
public <K, V> NavigableMap<K, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
throws IOException {
return column.readResultsWithTimestamps(result, columnPrefixBytes,
keyConverter);
}
/**

View File

@ -15,11 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.application;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
package org.apache.hadoop.yarn.server.timelineservice.storage.application;
/**
* Represents a rowkey for the application table.
@ -28,11 +25,11 @@ public class ApplicationRowKey {
private final String clusterId;
private final String userId;
private final String flowName;
private final long flowRunId;
private final Long flowRunId;
private final String appId;
public ApplicationRowKey(String clusterId, String userId, String flowName,
long flowRunId, String appId) {
Long flowRunId, String appId) {
this.clusterId = clusterId;
this.userId = userId;
this.flowName = flowName;
@ -52,7 +49,7 @@ public class ApplicationRowKey {
return flowName;
}
public long getFlowRunId() {
public Long getFlowRunId() {
return flowRunId;
}
@ -71,9 +68,8 @@ public class ApplicationRowKey {
*/
public static byte[] getRowKeyPrefix(String clusterId, String userId,
String flowName) {
byte[] first = Bytes.toBytes(
Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowName));
return Separator.QUALIFIERS.join(first, new byte[0]);
return ApplicationRowKeyConverter.getInstance().encode(
new ApplicationRowKey(clusterId, userId, flowName, null, null));
}
/**
@ -88,10 +84,8 @@ public class ApplicationRowKey {
*/
public static byte[] getRowKeyPrefix(String clusterId, String userId,
String flowName, Long flowRunId) {
byte[] first = Bytes.toBytes(
Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowName));
byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
return Separator.QUALIFIERS.join(first, second, new byte[0]);
return ApplicationRowKeyConverter.getInstance().encode(
new ApplicationRowKey(clusterId, userId, flowName, flowRunId, null));
}
/**
@ -107,14 +101,8 @@ public class ApplicationRowKey {
*/
public static byte[] getRowKey(String clusterId, String userId,
String flowName, Long flowRunId, String appId) {
byte[] first =
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId,
flowName));
// Note that flowRunId is a long, so we can't encode them all at the same
// time.
byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
byte[] third = TimelineStorageUtils.encodeAppId(appId);
return Separator.QUALIFIERS.join(first, second, third);
return ApplicationRowKeyConverter.getInstance().encode(
new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId));
}
/**
@ -124,22 +112,6 @@ public class ApplicationRowKey {
* @return An <cite>ApplicationRowKey</cite> object.
*/
public static ApplicationRowKey parseRowKey(byte[] rowKey) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
if (rowKeyComponents.length < 5) {
throw new IllegalArgumentException("the row key is not valid for " +
"an application");
}
String clusterId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
String userId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
String flowName =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
long flowRunId =
TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]);
return new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId);
return ApplicationRowKeyConverter.getInstance().decode(rowKey);
}
}

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.application;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
* Encodes and decodes row key for application table.
* The row key is of the form : clusterId!userName!flowName!flowRunId!appId.
* flowRunId is a long, appId is encoded and decoded using
* {@link AppIdKeyConverter} and rest are strings.
*/
public final class ApplicationRowKeyConverter implements
KeyConverter<ApplicationRowKey> {
private static final ApplicationRowKeyConverter INSTANCE =
new ApplicationRowKeyConverter();
public static ApplicationRowKeyConverter getInstance() {
return INSTANCE;
}
private ApplicationRowKeyConverter() {
}
// Application row key is of the form
// clusterId!userName!flowName!flowRunId!appId with each segment separated
// by !. The sizes below indicate sizes of each one of these segements in
// sequence. clusterId, userName and flowName are strings. flowrunId is a long
// hence 8 bytes in size. app id is represented as 12 bytes with cluster
// timestamp part of appid being 8 bytes(long) and seq id being 4 bytes(int).
// Strings are variable in size (i.e. end whenever separator is encountered).
// This is used while decoding and helps in determining where to split.
private static final int[] SEGMENT_SIZES = {
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize() };
/*
* (non-Javadoc)
*
* Encodes ApplicationRowKey object into a byte array with each
* component/field in ApplicationRowKey separated by Separator#QUALIFIERS.
* This leads to an application table row key of the form
* clusterId!userName!flowName!flowRunId!appId
* If flowRunId in passed ApplicationRowKey object is null (and the fields
* preceding it i.e. clusterId, userId and flowName are not null), this
* returns a row key prefix of the form clusterId!userName!flowName! and if
* appId in ApplicationRowKey is null (other 4 components are not null), this
* returns a row key prefix of the form clusterId!userName!flowName!flowRunId!
* flowRunId is inverted while encoding as it helps maintain a descending
* order for row keys in application table.
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #encode(java.lang.Object)
*/
@Override
public byte[] encode(ApplicationRowKey rowKey) {
byte[] cluster = Separator.encode(rowKey.getClusterId(),
Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
byte[] user = Separator.encode(rowKey.getUserId(),
Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
byte[] flow = Separator.encode(rowKey.getFlowName(),
Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
byte[] first = Separator.QUALIFIERS.join(cluster, user, flow);
// Note that flowRunId is a long, so we can't encode them all at the same
// time.
if (rowKey.getFlowRunId() == null) {
return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
}
byte[] second = Bytes.toBytes(
TimelineStorageUtils.invertLong(rowKey.getFlowRunId()));
if (rowKey.getAppId() == null || rowKey.getAppId().isEmpty()) {
return Separator.QUALIFIERS.join(first, second, Separator.EMPTY_BYTES);
}
byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId());
return Separator.QUALIFIERS.join(first, second, third);
}
/*
* (non-Javadoc)
*
* Decodes an application row key of the form
* clusterId!userName!flowName!flowRunId!appId represented in byte format and
* converts it into an ApplicationRowKey object.flowRunId is inverted while
* decoding as it was inverted while encoding.
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #decode(byte[])
*/
@Override
public ApplicationRowKey decode(byte[] rowKey) {
byte[][] rowKeyComponents =
Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
if (rowKeyComponents.length != 5) {
throw new IllegalArgumentException("the row key is not valid for " +
"an application");
}
String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
String userId = Separator.decode(Bytes.toString(rowKeyComponents[1]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
Long flowRunId =
TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]);
return new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId);
}
}

View File

@ -17,10 +17,6 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
* Represents a rowkey for the app_flow table.
*/
@ -50,9 +46,8 @@ public class AppToFlowRowKey {
* @return byte array with the row key
*/
public static byte[] getRowKey(String clusterId, String appId) {
byte[] first = Bytes.toBytes(clusterId);
byte[] second = TimelineStorageUtils.encodeAppId(appId);
return Separator.QUALIFIERS.join(first, second);
return AppToFlowRowKeyConverter.getInstance().encode(
new AppToFlowRowKey(clusterId, appId));
}
/**
@ -62,15 +57,6 @@ public class AppToFlowRowKey {
* @return an <cite>AppToFlowRowKey</cite> object.
*/
public static AppToFlowRowKey parseRowKey(byte[] rowKey) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
if (rowKeyComponents.length < 2) {
throw new IllegalArgumentException("the row key is not valid for " +
"the app-to-flow table");
}
String clusterId = Bytes.toString(rowKeyComponents[0]);
String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[1]);
return new AppToFlowRowKey(clusterId, appId);
return AppToFlowRowKeyConverter.getInstance().decode(rowKey);
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
/**
* Encodes and decodes row key for app_flow table.
* The row key is of the form : clusterId!appId.
* clusterId is a string and appId is encoded/decoded using
* {@link AppIdKeyConverter}.
*/
public final class AppToFlowRowKeyConverter
implements KeyConverter<AppToFlowRowKey> {
private static final AppToFlowRowKeyConverter INSTANCE =
new AppToFlowRowKeyConverter();
public static AppToFlowRowKeyConverter getInstance() {
return INSTANCE;
}
private AppToFlowRowKeyConverter() {
}
// App to flow row key is of the form clusterId!appId with the 2 segments
// separated by !. The sizes below indicate sizes of both of these segments
// in sequence. clusterId is a string. appId is represented as 12 bytes with
// cluster Timestamp part of appid being 8 bytes(long) and seq id being 4
// bytes(int).
// Strings are variable in size (i.e. end whenever separator is encountered).
// This is used while decoding and helps in determining where to split.
private static final int[] SEGMENT_SIZES = {
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT };
/*
* (non-Javadoc)
*
* Encodes AppToFlowRowKey object into a byte array with each component/field
* in AppToFlowRowKey separated by Separator#QUALIFIERS. This leads to an
* app to flow table row key of the form clusterId!appId
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #encode(java.lang.Object)
*/
@Override
public byte[] encode(AppToFlowRowKey rowKey) {
byte[] first = Separator.encode(rowKey.getClusterId(),
Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
byte[] second = AppIdKeyConverter.getInstance().encode(rowKey.getAppId());
return Separator.QUALIFIERS.join(first, second);
}
/*
* (non-Javadoc)
*
* Decodes an app to flow row key of the form clusterId!appId represented in
* byte format and converts it into an AppToFlowRowKey object.
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #decode(byte[])
*/
@Override
public AppToFlowRowKey decode(byte[] rowKey) {
byte[][] rowKeyComponents =
Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
if (rowKeyComponents.length != 2) {
throw new IllegalArgumentException("the row key is not valid for " +
"the app-to-flow table");
}
String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[1]);
return new AppToFlowRowKey(clusterId, appId);
}
}

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* Encodes and decodes {@link ApplicationId} for row keys.
* App ID is stored in row key as 12 bytes, cluster timestamp section of app id
* (long - 8 bytes) followed by sequence id section of app id (int - 4 bytes).
*/
public final class AppIdKeyConverter implements KeyConverter<String> {
private static final AppIdKeyConverter INSTANCE = new AppIdKeyConverter();
public static AppIdKeyConverter getInstance() {
return INSTANCE;
}
private AppIdKeyConverter() {
}
/*
* (non-Javadoc)
*
* Converts/encodes a string app Id into a byte representation for (row) keys.
* For conversion, we extract cluster timestamp and sequence id from the
* string app id (calls ConverterUtils#toApplicationId(String) for
* conversion) and then store it in a byte array of length 12 (8 bytes (long)
* for cluster timestamp followed 4 bytes(int) for sequence id). Both cluster
* timestamp and sequence id are inverted so that the most recent cluster
* timestamp and highest sequence id appears first in the table (i.e.
* application id appears in a descending order).
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #encode(java.lang.Object)
*/
@Override
public byte[] encode(String appIdStr) {
ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
byte[] appIdBytes = new byte[getKeySize()];
byte[] clusterTs = Bytes.toBytes(
TimelineStorageUtils.invertLong(appId.getClusterTimestamp()));
System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG);
byte[] seqId = Bytes.toBytes(TimelineStorageUtils.invertInt(appId.getId()));
System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT);
return appIdBytes;
}
/*
* (non-Javadoc)
*
* Converts/decodes a 12 byte representation of app id for (row) keys to an
* app id in string format which can be returned back to client.
* For decoding, 12 bytes are interpreted as 8 bytes of inverted cluster
* timestamp(long) followed by 4 bytes of inverted sequence id(int). Calls
* ApplicationId#toString to generate string representation of app id.
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #decode(byte[])
*/
@Override
public String decode(byte[] appIdBytes) {
if (appIdBytes.length != getKeySize()) {
throw new IllegalArgumentException("Invalid app id in byte format");
}
long clusterTs = TimelineStorageUtils.invertLong(
Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG));
int seqId = TimelineStorageUtils.invertInt(
Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT));
return ApplicationId.newInstance(clusterTs, seqId).toString();
}
/**
* Returns the size of app id after encoding.
*
* @return size of app id after encoding.
*/
public static int getKeySize() {
return Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
}
}

View File

@ -166,19 +166,22 @@ public class ColumnHelper<T> {
* @param result from which to reads data with timestamps
* @param columnPrefixBytes optional prefix to limit columns. If null all
* columns are returned.
* @param <K> identifies the type of column name(indicated by type of key
* converter).
* @param <V> the type of the values. The values will be cast into that type.
* @param keyConverter used to convert column bytes to the appropriate key
* type.
* @return the cell values at each respective time in for form
* {@literal {idA={timestamp1->value1}, idA={timestamp2->value2},
* idB={timestamp3->value3}, idC={timestamp1->value4}}}
* @throws IOException if any problem occurs while reading results.
*/
@SuppressWarnings("unchecked")
public <V> NavigableMap<String, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result, byte[] columnPrefixBytes)
throws IOException {
public <K, V> NavigableMap<K, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result, byte[] columnPrefixBytes,
KeyConverter<K> keyConverter) throws IOException {
NavigableMap<String, NavigableMap<Long, V>> results =
new TreeMap<String, NavigableMap<Long, V>>();
NavigableMap<K, NavigableMap<Long, V>> results = new TreeMap<>();
if (result != null) {
NavigableMap<
@ -192,13 +195,17 @@ public class ColumnHelper<T> {
if (columnCellMap != null) {
for (Entry<byte[], NavigableMap<Long, byte[]>> entry : columnCellMap
.entrySet()) {
String columnName = null;
K converterColumnKey = null;
if (columnPrefixBytes == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("null prefix was specified; returning all columns");
}
// Decode the spaces we encoded in the column name.
columnName = Separator.decode(entry.getKey(), Separator.SPACE);
try {
converterColumnKey = keyConverter.decode(entry.getKey());
} catch (IllegalArgumentException iae) {
LOG.error("Illegal column found, skipping this column.", iae);
continue;
}
} else {
// A non-null prefix means columns are actually of the form
// prefix!columnNameRemainder
@ -207,13 +214,18 @@ public class ColumnHelper<T> {
byte[] actualColumnPrefixBytes = columnNameParts[0];
if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
&& columnNameParts.length == 2) {
// This is the prefix that we want
columnName = Separator.decode(columnNameParts[1]);
try {
// This is the prefix that we want
converterColumnKey = keyConverter.decode(columnNameParts[1]);
} catch (IllegalArgumentException iae) {
LOG.error("Illegal column found, skipping this column.", iae);
continue;
}
}
}
// If this column has the prefix we want
if (columnName != null) {
if (converterColumnKey != null) {
NavigableMap<Long, V> cellResults =
new TreeMap<Long, V>();
NavigableMap<Long, byte[]> cells = entry.getValue();
@ -226,7 +238,7 @@ public class ColumnHelper<T> {
value);
}
}
results.put(columnName, cellResults);
results.put(converterColumnKey, cellResults);
}
} // for entry : columnCellMap
} // if columnCellMap != null
@ -235,66 +247,13 @@ public class ColumnHelper<T> {
}
/**
* @param <K> identifies the type of column name(indicated by type of key
* converter).
* @param result from which to read columns
* @param columnPrefixBytes optional prefix to limit columns. If null all
* columns are returned.
* @return the latest values of columns in the column family. This assumes
* that the column name parts are all Strings by default. If the
* column name parts should be treated natively and not be converted
* back and forth from Strings, you should use
* {@link #readResultsHavingCompoundColumnQualifiers(Result, byte[])}
* instead.
* @throws IOException if any problem occurs while reading results.
*/
public Map<String, Object> readResults(Result result,
byte[] columnPrefixBytes) throws IOException {
Map<String, Object> results = new HashMap<String, Object>();
if (result != null) {
Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes);
for (Entry<byte[], byte[]> entry : columns.entrySet()) {
byte[] columnKey = entry.getKey();
if (columnKey != null && columnKey.length > 0) {
String columnName = null;
if (columnPrefixBytes == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("null prefix was specified; returning all columns");
}
// Decode the spaces we encoded in the column name.
columnName = Separator.decode(columnKey, Separator.SPACE);
} else {
// A non-null prefix means columns are actually of the form
// prefix!columnNameRemainder
byte[][] columnNameParts =
Separator.QUALIFIERS.split(columnKey, 2);
byte[] actualColumnPrefixBytes = columnNameParts[0];
if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
&& columnNameParts.length == 2) {
// This is the prefix that we want
// if the column name is a compound qualifier
// with non string datatypes, the following decode will not
// work correctly since it considers all components to be String
// invoke the readResultsHavingCompoundColumnQualifiers function
columnName = Separator.decode(columnNameParts[1]);
}
}
// If this column has the prefix we want
if (columnName != null) {
Object value = converter.decodeValue(entry.getValue());
results.put(columnName, value);
}
}
} // for entry
}
return results;
}
/**
* @param result from which to read columns
* @param columnPrefixBytes optional prefix to limit columns. If null all
* columns are returned.
* @param keyConverter used to convert column bytes to the appropriate key
* type.
* @return the latest values of columns in the column family. If the column
* prefix is null, the column qualifier is returned as Strings. For a
* non-null column prefix bytes, the column qualifier is returned as
@ -302,38 +261,51 @@ public class ColumnHelper<T> {
* returning byte arrays of values that were not Strings.
* @throws IOException if any problem occurs while reading results.
*/
public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result,
byte[] columnPrefixBytes) throws IOException {
// handle the case where the column prefix is null
// it is the same as readResults() so simply delegate to that implementation
if (columnPrefixBytes == null) {
return readResults(result, null);
}
Map<byte[][], Object> results = new HashMap<byte[][], Object>();
public <K> Map<K, Object> readResults(Result result,
byte[] columnPrefixBytes, KeyConverter<K> keyConverter)
throws IOException {
Map<K, Object> results = new HashMap<K, Object>();
if (result != null) {
Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes);
for (Entry<byte[], byte[]> entry : columns.entrySet()) {
byte[] columnKey = entry.getKey();
if (columnKey != null && columnKey.length > 0) {
// A non-null prefix means columns are actually of the form
// prefix!columnNameRemainder
// with a compound column qualifier, we are presuming existence of a
// prefix
byte[][] columnNameParts = Separator.QUALIFIERS.split(columnKey, 2);
if (columnNameParts.length > 0) {
byte[] actualColumnPrefixBytes = columnNameParts[0];
if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
&& columnNameParts.length == 2) {
// This is the prefix that we want
byte[][] columnQualifierParts =
Separator.VALUES.split(columnNameParts[1]);
Object value = converter.decodeValue(entry.getValue());
// we return the columnQualifier in parts since we don't know
// which part is of which data type
results.put(columnQualifierParts, value);
K converterColumnKey = null;
if (columnPrefixBytes == null) {
try {
converterColumnKey = keyConverter.decode(columnKey);
} catch (IllegalArgumentException iae) {
LOG.error("Illegal column found, skipping this column.", iae);
continue;
}
} else {
// A non-null prefix means columns are actually of the form
// prefix!columnNameRemainder
byte[][] columnNameParts = Separator.QUALIFIERS.split(columnKey, 2);
if (columnNameParts.length > 0) {
byte[] actualColumnPrefixBytes = columnNameParts[0];
// If this is the prefix that we want
if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
&& columnNameParts.length == 2) {
try {
converterColumnKey = keyConverter.decode(columnNameParts[1]);
} catch (IllegalArgumentException iae) {
LOG.error("Illegal column found, skipping this column.", iae);
continue;
}
}
}
} // if-else
// If the columnPrefix is null (we want all columns), or the actual
// prefix matches the given prefix we want this column
if (converterColumnKey != null) {
Object value = converter.decodeValue(entry.getValue());
// we return the columnQualifier in parts since we don't know
// which part is of which data type.
results.put(converterColumnKey, value);
}
}
} // for entry
@ -353,8 +325,9 @@ public class ColumnHelper<T> {
public static byte[] getColumnQualifier(byte[] columnPrefixBytes,
String qualifier) {
// We don't want column names to have spaces
byte[] encodedQualifier = Bytes.toBytes(Separator.SPACE.encode(qualifier));
// We don't want column names to have spaces / tabs.
byte[] encodedQualifier =
Separator.encode(qualifier, Separator.SPACE, Separator.TAB);
if (columnPrefixBytes == null) {
return encodedQualifier;
}
@ -366,22 +339,6 @@ public class ColumnHelper<T> {
return columnQualifier;
}
/**
* Create a compound column qualifier by combining qualifier and components.
*
* @param qualifier Column QUalifier.
* @param components Other components.
* @return a byte array representing compound column qualifier.
*/
public static byte[] getCompoundColumnQualifierBytes(String qualifier,
byte[]...components) {
byte[] colQualBytes = Bytes.toBytes(Separator.VALUES.encode(qualifier));
for (int i = 0; i < components.length; i++) {
colQualBytes = Separator.VALUES.join(colQualBytes, components[i]);
}
return colQualBytes;
}
/**
* @param columnPrefixBytes The byte representation for the column prefix.
* Should not contain {@link Separator#QUALIFIERS}.

View File

@ -91,37 +91,33 @@ public interface ColumnPrefix<T> {
Object readResult(Result result, String qualifier) throws IOException;
/**
* @param result from which to read columns
*
* @param <K> identifies the type of key converter.
* @param result from which to read columns.
* @param keyConverter used to convert column bytes to the appropriate key
* type
* @return the latest values of columns in the column family with this prefix
* (or all of them if the prefix value is null).
* @throws IOException if there is any exception encountered while reading
* results.
* results.
*/
Map<String, Object> readResults(Result result) throws IOException;
<K> Map<K, Object> readResults(Result result, KeyConverter<K> keyConverter)
throws IOException;
/**
* @param result from which to reads data with timestamps
* @param result from which to reads data with timestamps.
* @param <K> identifies the type of key converter.
* @param <V> the type of the values. The values will be cast into that type.
* @param keyConverter used to convert column bytes to the appropriate key
* type.
* @return the cell values at each respective time in for form
* {@literal {idA={timestamp1->value1}, idA={timestamp2->value2},
* idB={timestamp3->value3}, idC={timestamp1->value4}}}
* @throws IOException if there is any exception encountered while reading
* result.
*/
<V> NavigableMap<String, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result) throws IOException;
/**
* @param result from which to read columns
* @return the latest values of columns in the column family. The column
* qualifier is returned as a list of parts, each part a byte[]. This
* is to facilitate returning byte arrays of values that were not
* Strings. If they can be treated as Strings, you should use
* {@link #readResults(Result)} instead.
* @throws IOException if any problem occurs while reading results.
*/
Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
throws IOException;
<K, V> NavigableMap<K, NavigableMap<Long, V>> readResultsWithTimestamps(
Result result, KeyConverter<K> keyConverter) throws IOException;
/**
* @param qualifierPrefix Column qualifier or prefix of qualifier.
@ -146,15 +142,4 @@ public interface ColumnPrefix<T> {
* @return a {@link ValueConverter} implementation.
*/
ValueConverter getValueConverter();
/**
* Get compound column qualifier bytes if the column qualifier is a compound
* qualifier. Returns the qualifier passed as bytes if the column is not a
* compound column qualifier.
*
* @param qualifier Column Qualifier.
* @param components Other components.
* @return byte array representing compound column qualifier.
*/
byte[] getCompoundColQualBytes(String qualifier, byte[]...components);
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
/**
* Encapsulates information about Event column names for application and entity
* tables. Used while encoding/decoding event column names.
*/
public class EventColumnName {
private final String id;
private final Long timestamp;
private final String infoKey;
public EventColumnName(String id, Long timestamp, String infoKey) {
this.id = id;
this.timestamp = timestamp;
this.infoKey = infoKey;
}
public String getId() {
return id;
}
public Long getTimestamp() {
return timestamp;
}
public String getInfoKey() {
return infoKey;
}
}

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Encodes and decodes event column names for application and entity tables.
* The event column name is of the form : eventId=timestamp=infokey.
* If info is not associated with the event, event column name is of the form :
* eventId=timestamp=
* Event timestamp is long and rest are strings.
* Column prefixes are not part of the eventcolumn name passed for encoding. It
* is added later, if required in the associated ColumnPrefix implementations.
*/
public final class EventColumnNameConverter
implements KeyConverter<EventColumnName> {
private static final EventColumnNameConverter INSTANCE =
new EventColumnNameConverter();
public static EventColumnNameConverter getInstance() {
return INSTANCE;
}
private EventColumnNameConverter() {
}
// eventId=timestamp=infokey are of types String, Long String
// Strings are variable in size (i.e. end whenever separator is encountered).
// This is used while decoding and helps in determining where to split.
private static final int[] SEGMENT_SIZES = {
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE };
/*
* (non-Javadoc)
*
* Encodes EventColumnName into a byte array with each component/field in
* EventColumnName separated by Separator#VALUES. This leads to an event
* column name of the form eventId=timestamp=infokey.
* If timestamp in passed EventColumnName object is null (eventId is not null)
* this returns a column prefix of the form eventId= and if infokey in
* EventColumnName is null (other 2 components are not null), this returns a
* column name of the form eventId=timestamp=
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #encode(java.lang.Object)
*/
@Override
public byte[] encode(EventColumnName key) {
byte[] first = Separator.encode(key.getId(), Separator.SPACE, Separator.TAB,
Separator.VALUES);
if (key.getTimestamp() == null) {
return Separator.VALUES.join(first, Separator.EMPTY_BYTES);
}
byte[] second = Bytes.toBytes(
TimelineStorageUtils.invertLong(key.getTimestamp()));
if (key.getInfoKey() == null) {
return Separator.VALUES.join(first, second, Separator.EMPTY_BYTES);
}
return Separator.VALUES.join(first, second, Separator.encode(
key.getInfoKey(), Separator.SPACE, Separator.TAB, Separator.VALUES));
}
/*
* (non-Javadoc)
*
* Decodes an event column name of the form eventId=timestamp= or
* eventId=timestamp=infoKey represented in byte format and converts it into
* an EventColumnName object.
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #decode(byte[])
*/
@Override
public EventColumnName decode(byte[] bytes) {
byte[][] components = Separator.VALUES.split(bytes, SEGMENT_SIZES);
if (components.length != 3) {
throw new IllegalArgumentException("the column name is not valid");
}
String id = Separator.decode(Bytes.toString(components[0]),
Separator.VALUES, Separator.TAB, Separator.SPACE);
Long ts = TimelineStorageUtils.invertLong(Bytes.toLong(components[1]));
String infoKey = components[2].length == 0 ? null :
Separator.decode(Bytes.toString(components[2]),
Separator.VALUES, Separator.TAB, Separator.SPACE);
return new EventColumnName(id, ts, infoKey);
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
/**
* Interface which has to be implemented for encoding and decoding row keys and
* columns.
*/
public interface KeyConverter<T> {
/**
* Encodes a key as a byte array.
*
* @param key key to be encoded.
* @return a byte array.
*/
byte[] encode(T key);
/**
* Decodes a byte array and returns a key of type T.
*
* @param bytes byte representation
* @return an object(key) of type T which has been constructed after decoding
* the bytes.
*/
T decode(byte[] bytes);
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.io.IOException;
/**
* Encodes and decodes column names / row keys which are long.
*/
public final class LongKeyConverter implements KeyConverter<Long> {
private static final LongKeyConverter INSTANCE = new LongKeyConverter();
public static LongKeyConverter getInstance() {
return INSTANCE;
}
private LongKeyConverter() {
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #encode(java.lang.Object)
*/
@Override
public byte[] encode(Long key) {
try {
// IOException will not be thrown here as we are explicitly passing
// Long.
return LongConverter.getInstance().encodeValue(key);
} catch (IOException e) {
return null;
}
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #decode(byte[])
*/
@Override
public Long decode(byte[] bytes) {
try {
return (Long) LongConverter.getInstance().decodeValue(bytes);
} catch (IOException e) {
return null;
}
}
}

View File

@ -45,7 +45,13 @@ public enum Separator {
* getting a + for a space, which may already occur in strings, so we don't
* want that.
*/
SPACE(" ", "%2$");
SPACE(" ", "%2$"),
/**
* separator in values, often used to avoid having these in qualifiers and
* names.
*/
TAB("\t", "%3$");
/**
* The string value of this separator.
@ -67,7 +73,22 @@ public enum Separator {
*/
private final String quotedValue;
private static final byte[] EMPTY_BYTES = new byte[0];
/**
* Indicator for variable size of an individual segment in a split. The
* segment ends wherever separator is encountered.
* Typically used for string.
* Also used to indicate that there is no fixed number of splits which need to
* be returned. If split limit is specified as this, all possible splits are
* returned.
*/
public static final int VARIABLE_SIZE = 0;
/** empty string. */
public static final String EMPTY_STRING = "";
/** empty bytes. */
public static final byte[] EMPTY_BYTES = new byte[0];
/**
* @param value of the separator to use. Cannot be null or empty string.
@ -222,7 +243,6 @@ public enum Separator {
System.arraycopy(this.bytes, 0, buf, offset, this.value.length());
offset += this.value.length();
}
}
return buf;
}
@ -307,7 +327,25 @@ public enum Separator {
* @return source split by this separator.
*/
public byte[][] split(byte[] source, int limit) {
return TimelineStorageUtils.split(source, this.bytes, limit);
return split(source, this.bytes, limit);
}
/**
* Splits the source array into multiple array segments using this separator.
* The sizes indicate the sizes of the relative components/segments.
* In case one of the segments contains this separator before the specified
* size is reached, the separator will be considered part of that segment and
* we will continue till size is reached.
* Variable length strings cannot contain this separator and are indiced with
* a size of {@value #VARIABLE_SIZE}. Such strings are encoded for this
* separator and decoded after the results from split is returned.
*
* @param source byte array to be split.
* @param sizes sizes of relative components/segments.
* @return source split by this separator as per the sizes specified..
*/
public byte[][] split(byte[] source, int[] sizes) {
return split(source, this.bytes, sizes);
}
/**
@ -315,10 +353,158 @@ public enum Separator {
* as many times as splits are found. This will naturally produce copied byte
* arrays for each of the split segments.
*
* @param source to be split
* @param source byte array to be split
* @return source split by this separator.
*/
public byte[][] split(byte[] source) {
return TimelineStorageUtils.split(source, this.bytes);
return split(source, this.bytes);
}
/**
* Returns a list of ranges identifying [start, end) -- closed, open --
* positions within the source byte array that would be split using the
* separator byte array.
* The sizes indicate the sizes of the relative components/segments.
* In case one of the segments contains this separator before the specified
* size is reached, the separator will be considered part of that segment and
* we will continue till size is reached.
* Variable length strings cannot contain this separator and are indiced with
* a size of {@value #VARIABLE_SIZE}. Such strings are encoded for this
* separator and decoded after the results from split is returned.
*
* @param source the source data
* @param separator the separator pattern to look for
* @param sizes indicate the sizes of the relative components/segments.
* @return a list of ranges.
*/
private static List<Range> splitRanges(byte[] source, byte[] separator,
int[] sizes) {
List<Range> segments = new ArrayList<Range>();
if (source == null || separator == null) {
return segments;
}
// VARIABLE_SIZE here indicates that there is no limit to number of segments
// to return.
int limit = VARIABLE_SIZE;
if (sizes != null && sizes.length > 0) {
limit = sizes.length;
}
int start = 0;
int currentSegment = 0;
itersource: for (int i = 0; i < source.length; i++) {
for (int j = 0; j < separator.length; j++) {
if (source[i + j] != separator[j]) {
continue itersource;
}
}
// all separator elements matched
if (limit > VARIABLE_SIZE) {
if (segments.size() >= (limit - 1)) {
// everything else goes in one final segment
break;
}
if (sizes != null) {
int currentSegExpectedSize = sizes[currentSegment];
if (currentSegExpectedSize > VARIABLE_SIZE) {
int currentSegSize = i - start;
if (currentSegSize < currentSegExpectedSize) {
// Segment not yet complete. More bytes to parse.
continue itersource;
} else if (currentSegSize > currentSegExpectedSize) {
// Segment is not as per size.
throw new IllegalArgumentException(
"Segments not separated as per expected sizes");
}
}
}
}
segments.add(new Range(start, i));
start = i + separator.length;
// i will be incremented again in outer for loop
i += separator.length - 1;
currentSegment++;
}
// add in remaining to a final range
if (start <= source.length) {
if (sizes != null) {
// Check if final segment is as per size specified.
if (sizes[currentSegment] > VARIABLE_SIZE &&
source.length - start > sizes[currentSegment]) {
// Segment is not as per size.
throw new IllegalArgumentException(
"Segments not separated as per expected sizes");
}
}
segments.add(new Range(start, source.length));
}
return segments;
}
/**
* Splits based on segments calculated based on limit/sizes specified for the
* separator.
*
* @param source byte array to be split.
* @param segments specifies the range for each segment.
* @return a byte[][] split as per the segment ranges.
*/
private static byte[][] split(byte[] source, List<Range> segments) {
byte[][] splits = new byte[segments.size()][];
for (int i = 0; i < segments.size(); i++) {
Range r = segments.get(i);
byte[] tmp = new byte[r.length()];
if (tmp.length > 0) {
System.arraycopy(source, r.start(), tmp, 0, r.length());
}
splits[i] = tmp;
}
return splits;
}
/**
* Splits the source array into multiple array segments using the given
* separator based on the sizes. This will naturally produce copied byte
* arrays for each of the split segments.
*
* @param source source array.
* @param separator separator represented as a byte array.
* @param sizes sizes of relative components/segments.
* @return byte[][] after splitting the source.
*/
private static byte[][] split(byte[] source, byte[] separator, int[] sizes) {
List<Range> segments = splitRanges(source, separator, sizes);
return split(source, segments);
}
/**
* Splits the source array into multiple array segments using the given
* separator. This will naturally produce copied byte arrays for each of the
* split segments.
*
* @param source Source array.
* @param separator Separator represented as a byte array.
* @return byte[][] after splitting the source.
*/
private static byte[][] split(byte[] source, byte[] separator) {
return split(source, separator, (int[]) null);
}
/**
* Splits the source array into multiple array segments using the given
* separator, up to a maximum of count items. This will naturally produce
* copied byte arrays for each of the split segments.
*
* @param source Source array.
* @param separator Separator represented as a byte array.
* @param limit a non-positive value indicates no limit on number of segments.
* @return byte[][] after splitting the input source.
*/
private static byte[][] split(byte[] source, byte[] separator, int limit) {
int[] sizes = null;
if (limit > VARIABLE_SIZE) {
sizes = new int[limit];
}
List<Range> segments = splitRanges(source, separator, sizes);
return split(source, segments);
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
/**
* Encodes and decodes column names / row keys which are merely strings.
* Column prefixes are not part of the column name passed for encoding. It is
* added later, if required in the associated ColumnPrefix implementations.
*/
public final class StringKeyConverter implements KeyConverter<String> {
private static final StringKeyConverter INSTANCE = new StringKeyConverter();
public static StringKeyConverter getInstance() {
return INSTANCE;
}
private StringKeyConverter() {
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #encode(java.lang.Object)
*/
@Override
public byte[] encode(String key) {
return Separator.encode(key, Separator.SPACE, Separator.TAB);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #decode(byte[])
*/
@Override
public String decode(byte[] bytes) {
return Separator.decode(bytes, Separator.TAB, Separator.SPACE);
}
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@ -40,7 +39,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
@ -48,18 +46,17 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter.TimelineFilterType;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* A bunch of utility functions used across TimelineReader and TimelineWriter.
@ -72,108 +69,9 @@ public final class TimelineStorageUtils {
private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class);
/** empty bytes. */
public static final byte[] EMPTY_BYTES = new byte[0];
/** indicator for no limits for splitting. */
public static final int NO_LIMIT_SPLIT = -1;
/** milliseconds in one day. */
public static final long MILLIS_ONE_DAY = 86400000L;
/**
* Splits the source array into multiple array segments using the given
* separator, up to a maximum of count items. This will naturally produce
* copied byte arrays for each of the split segments. To identify the split
* ranges without the array copies, see {@link #splitRanges(byte[], byte[])}.
*
* @param source Source array.
* @param separator Separator represented as a byte array.
* @return byte[][] after splitting the source
*/
public static byte[][] split(byte[] source, byte[] separator) {
return split(source, separator, NO_LIMIT_SPLIT);
}
/**
* Splits the source array into multiple array segments using the given
* separator, up to a maximum of count items. This will naturally produce
* copied byte arrays for each of the split segments. To identify the split
* ranges without the array copies, see {@link #splitRanges(byte[], byte[])}.
*
* @param source Source array.
* @param separator Separator represented as a byte array.
* @param limit a non-positive value indicates no limit on number of segments.
* @return byte[][] after splitting the input source.
*/
public static byte[][] split(byte[] source, byte[] separator, int limit) {
List<Range> segments = splitRanges(source, separator, limit);
byte[][] splits = new byte[segments.size()][];
for (int i = 0; i < segments.size(); i++) {
Range r = segments.get(i);
byte[] tmp = new byte[r.length()];
if (tmp.length > 0) {
System.arraycopy(source, r.start(), tmp, 0, r.length());
}
splits[i] = tmp;
}
return splits;
}
/**
* Returns a list of ranges identifying [start, end) -- closed, open --
* positions within the source byte array that would be split using the
* separator byte array.
*
* @param source Source array.
* @param separator Separator represented as a byte array.
* @return a list of ranges.
*/
public static List<Range> splitRanges(byte[] source, byte[] separator) {
return splitRanges(source, separator, NO_LIMIT_SPLIT);
}
/**
* Returns a list of ranges identifying [start, end) -- closed, open --
* positions within the source byte array that would be split using the
* separator byte array.
*
* @param source the source data
* @param separator the separator pattern to look for
* @param limit the maximum number of splits to identify in the source
* @return a list of ranges.
*/
public static List<Range> splitRanges(byte[] source, byte[] separator,
int limit) {
List<Range> segments = new ArrayList<Range>();
if ((source == null) || (separator == null)) {
return segments;
}
int start = 0;
itersource: for (int i = 0; i < source.length; i++) {
for (int j = 0; j < separator.length; j++) {
if (source[i + j] != separator[j]) {
continue itersource;
}
}
// all separator elements matched
if (limit > 0 && segments.size() >= (limit - 1)) {
// everything else goes in one final segment
break;
}
segments.add(new Range(start, i));
start = i + separator.length;
// i will be incremented again in outer for loop
i += separator.length - 1;
}
// add in remaining to a final range
if (start <= source.length) {
segments.add(new Range(start, source.length));
}
return segments;
}
/**
* Converts a timestamp into it's inverse timestamp to be used in (row) keys
* where we want to have the most recent timestamp in the top of the table
@ -200,53 +98,6 @@ public final class TimelineStorageUtils {
return Integer.MAX_VALUE - key;
}
/**
* Converts/encodes a string app Id into a byte representation for (row) keys.
* For conversion, we extract cluster timestamp and sequence id from the
* string app id (calls {@link ConverterUtils#toApplicationId(String)} for
* conversion) and then store it in a byte array of length 12 (8 bytes (long)
* for cluster timestamp followed 4 bytes(int) for sequence id). Both cluster
* timestamp and sequence id are inverted so that the most recent cluster
* timestamp and highest sequence id appears first in the table (i.e.
* application id appears in a descending order).
*
* @param appIdStr application id in string format i.e.
* application_{cluster timestamp}_{sequence id with min 4 digits}
*
* @return encoded byte representation of app id.
*/
public static byte[] encodeAppId(String appIdStr) {
ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
byte[] appIdBytes = new byte[Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT];
byte[] clusterTs = Bytes.toBytes(invertLong(appId.getClusterTimestamp()));
System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG);
byte[] seqId = Bytes.toBytes(invertInt(appId.getId()));
System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT);
return appIdBytes;
}
/**
* Converts/decodes a 12 byte representation of app id for (row) keys to an
* app id in string format which can be returned back to client.
* For decoding, 12 bytes are interpreted as 8 bytes of inverted cluster
* timestamp(long) followed by 4 bytes of inverted sequence id(int). Calls
* {@link ApplicationId#toString} to generate string representation of app id.
*
* @param appIdBytes application id in byte representation.
*
* @return decoded app id in string format.
*/
public static String decodeAppId(byte[] appIdBytes) {
if (appIdBytes.length != (Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT)) {
throw new IllegalArgumentException("Invalid app id in byte format");
}
long clusterTs = invertLong(Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG));
int seqId =
invertInt(Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT));
return ApplicationId.newInstance(clusterTs, seqId).toString();
}
/**
* returns the timestamp of that day's start (which is midnight 00:00:00 AM)
* for a given input timestamp.
@ -810,7 +661,8 @@ public final class TimelineStorageUtils {
TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
boolean isRelatedTo) throws IOException {
// isRelatedTo and relatesTo are of type Map<String, Set<String>>
Map<String, Object> columns = prefix.readResults(result);
Map<String, Object> columns =
prefix.readResults(result, StringKeyConverter.getInstance());
for (Map.Entry<String, Object> column : columns.entrySet()) {
for (String id : Separator.VALUES.splitEncoded(
column.getValue().toString())) {
@ -837,7 +689,8 @@ public final class TimelineStorageUtils {
TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
boolean isConfig) throws IOException {
// info and configuration are of type Map<String, Object or String>
Map<String, Object> columns = prefix.readResults(result);
Map<String, Object> columns =
prefix.readResults(result, StringKeyConverter.getInstance());
if (isConfig) {
for (Map.Entry<String, Object> column : columns.entrySet()) {
entity.addConfig(column.getKey(), column.getValue().toString());
@ -861,30 +714,24 @@ public final class TimelineStorageUtils {
public static <T> void readEvents(TimelineEntity entity, Result result,
ColumnPrefix<T> prefix) throws IOException {
Map<String, TimelineEvent> eventsMap = new HashMap<>();
Map<?, Object> eventsResult =
prefix.readResultsHavingCompoundColumnQualifiers(result);
for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
byte[][] karr = (byte[][])eventResult.getKey();
// the column name is of the form "eventId=timestamp=infoKey"
if (karr.length == 3) {
String id = Bytes.toString(karr[0]);
long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1]));
String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
TimelineEvent event = eventsMap.get(key);
if (event == null) {
event = new TimelineEvent();
event.setId(id);
event.setTimestamp(ts);
eventsMap.put(key, event);
}
// handle empty info
String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
if (infoKey != null) {
event.addInfo(infoKey, eventResult.getValue());
}
} else {
LOG.warn("incorrectly formatted column name: it will be discarded");
continue;
Map<EventColumnName, Object> eventsResult =
prefix.readResults(result, EventColumnNameConverter.getInstance());
for (Map.Entry<EventColumnName, Object>
eventResult : eventsResult.entrySet()) {
EventColumnName eventColumnName = eventResult.getKey();
String key = eventColumnName.getId() +
Long.toString(eventColumnName.getTimestamp());
// Retrieve previously seen event to add to it
TimelineEvent event = eventsMap.get(key);
if (event == null) {
// First time we're seeing this event, add it to the eventsMap
event = new TimelineEvent();
event.setId(eventColumnName.getId());
event.setTimestamp(eventColumnName.getTimestamp());
eventsMap.put(key, event);
}
if (eventColumnName.getInfoKey() != null) {
event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue());
}
}
Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());

View File

@ -27,9 +27,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
@ -78,7 +79,6 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
*/
private final String columnPrefix;
private final byte[] columnPrefixBytes;
private final boolean compoundColQual;
/**
* Private constructor, meant to be used by the enum definition.
@ -122,7 +122,6 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
this.columnPrefixBytes =
Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
}
this.compoundColQual = compondColQual;
}
/**
@ -154,14 +153,6 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
return column.getValueConverter();
}
public byte[] getCompoundColQualBytes(String qualifier,
byte[]...components) {
if (!compoundColQual) {
return ColumnHelper.getColumnQualifier(null, qualifier);
}
return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
}
/*
* (non-Javadoc)
*
@ -233,26 +224,12 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResults(org.apache.hadoop.hbase.client.Result)
* #readResults(org.apache.hadoop.hbase.client.Result,
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
*/
public Map<String, Object> readResults(Result result) throws IOException {
return column.readResults(result, columnPrefixBytes);
}
/**
* @param result from which to read columns
* @return the latest values of columns in the column family. The column
* qualifier is returned as a list of parts, each part a byte[]. This
* is to facilitate returning byte arrays of values that were not
* Strings. If they can be treated as Strings, you should use
* {@link #readResults(Result)} instead.
* @throws IOException if there is any exception encountered while reading
* result.
*/
public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
throws IOException {
return column.readResultsHavingCompoundColumnQualifiers(result,
columnPrefixBytes);
public <K> Map<K, Object> readResults(Result result,
KeyConverter<K> keyConverter) throws IOException {
return column.readResults(result, columnPrefixBytes, keyConverter);
}
/*
@ -260,11 +237,14 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
* #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
*/
public <V> NavigableMap<String, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result) throws IOException {
return column.readResultsWithTimestamps(result, columnPrefixBytes);
public <K, V> NavigableMap<K, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
throws IOException {
return column.readResultsWithTimestamps(result, columnPrefixBytes,
keyConverter);
}
/**

View File

@ -17,10 +17,6 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
* Represents a rowkey for the entity table.
*/
@ -28,13 +24,13 @@ public class EntityRowKey {
private final String clusterId;
private final String userId;
private final String flowName;
private final long flowRunId;
private final Long flowRunId;
private final String appId;
private final String entityType;
private final String entityId;
public EntityRowKey(String clusterId, String userId, String flowName,
long flowRunId, String appId, String entityType, String entityId) {
Long flowRunId, String appId, String entityType, String entityId) {
this.clusterId = clusterId;
this.userId = userId;
this.flowName = flowName;
@ -56,7 +52,7 @@ public class EntityRowKey {
return flowName;
}
public long getFlowRunId() {
public Long getFlowRunId() {
return flowRunId;
}
@ -85,14 +81,8 @@ public class EntityRowKey {
*/
public static byte[] getRowKeyPrefix(String clusterId, String userId,
String flowName, Long flowRunId, String appId) {
byte[] first =
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
flowName));
// Note that flowRunId is a long, so we can't encode them all at the same
// time.
byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
byte[] third = TimelineStorageUtils.encodeAppId(appId);
return Separator.QUALIFIERS.join(first, second, third, new byte[0]);
return EntityRowKeyConverter.getInstance().encode(new EntityRowKey(
clusterId, userId, flowName, flowRunId, appId, null, null));
}
/**
@ -111,16 +101,8 @@ public class EntityRowKey {
*/
public static byte[] getRowKeyPrefix(String clusterId, String userId,
String flowName, Long flowRunId, String appId, String entityType) {
byte[] first =
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
flowName));
// Note that flowRunId is a long, so we can't encode them all at the same
// time.
byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
byte[] third = TimelineStorageUtils.encodeAppId(appId);
byte[] fourth =
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, ""));
return Separator.QUALIFIERS.join(first, second, third, fourth);
return EntityRowKeyConverter.getInstance().encode(new EntityRowKey(
clusterId, userId, flowName, flowRunId, appId, entityType, null));
}
/**
@ -140,16 +122,8 @@ public class EntityRowKey {
public static byte[] getRowKey(String clusterId, String userId,
String flowName, Long flowRunId, String appId, String entityType,
String entityId) {
byte[] first =
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
flowName));
// Note that flowRunId is a long, so we can't encode them all at the same
// time.
byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
byte[] third = TimelineStorageUtils.encodeAppId(appId);
byte[] fourth =
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, entityId));
return Separator.QUALIFIERS.join(first, second, third, fourth);
return EntityRowKeyConverter.getInstance().encode(new EntityRowKey(
clusterId, userId, flowName, flowRunId, appId, entityType, entityId));
}
/**
@ -159,27 +133,6 @@ public class EntityRowKey {
* @return An <cite>EntityRowKey</cite> object.
*/
public static EntityRowKey parseRowKey(byte[] rowKey) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
if (rowKeyComponents.length < 7) {
throw new IllegalArgumentException("the row key is not valid for " +
"an entity");
}
String userId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
String clusterId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
String flowName =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
long flowRunId =
TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]);
String entityType =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5]));
String entityId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[6]));
return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
entityType, entityId);
return EntityRowKeyConverter.getInstance().decode(rowKey);
}
}

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
* Encodes and decodes row key for entity table.
* The row key is of the form :
* userName!clusterId!flowName!flowRunId!appId!entityType!entityId.
* flowRunId is a long, appId is encoded/decoded using
* {@link AppIdKeyConverter} and rest are strings.
*/
public final class EntityRowKeyConverter implements KeyConverter<EntityRowKey> {
private static final EntityRowKeyConverter INSTANCE =
new EntityRowKeyConverter();
public static EntityRowKeyConverter getInstance() {
return INSTANCE;
}
private EntityRowKeyConverter() {
}
// Entity row key is of the form
// userName!clusterId!flowName!flowRunId!appId!entityType!entityId with each
// segment separated by !. The sizes below indicate sizes of each one of these
// segements in sequence. clusterId, userName, flowName, entityType and
// entityId are strings. flowrunId is a long hence 8 bytes in size. app id is
// represented as 12 bytes with cluster timestamp part of appid being 8 bytes
// (long) and seq id being 4 bytes(int).
// Strings are variable in size (i.e. end whenever separator is encountered).
// This is used while decoding and helps in determining where to split.
private static final int[] SEGMENT_SIZES = {
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize(),
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE };
/*
* (non-Javadoc)
*
* Encodes EntityRowKey object into a byte array with each component/field in
* EntityRowKey separated by Separator#QUALIFIERS. This leads to an entity
* table row key of the form
* userName!clusterId!flowName!flowRunId!appId!entityType!entityId
* If entityType in passed EntityRowKey object is null (and the fields
* preceding it i.e. clusterId, userId and flowName, flowRunId and appId are
* not null), this returns a row key prefix of the form
* userName!clusterId!flowName!flowRunId!appId! and if entityId in
* EntityRowKey is null (other 6 components are not null), this returns a row
* key prefix of the form
* userName!clusterId!flowName!flowRunId!appId!entityType!
* flowRunId is inverted while encoding as it helps maintain a descending
* order for row keys in entity table.
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #encode(java.lang.Object)
*/
@Override
public byte[] encode(EntityRowKey rowKey) {
byte[] user = Separator.encode(rowKey.getUserId(),
Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
byte[] cluster = Separator.encode(rowKey.getClusterId(),
Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
byte[] flow = Separator.encode(rowKey.getFlowName(),
Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
byte[] first = Separator.QUALIFIERS.join(user, cluster, flow);
// Note that flowRunId is a long, so we can't encode them all at the same
// time.
byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(
rowKey.getFlowRunId()));
byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId());
if (rowKey.getEntityType() == null) {
return Separator.QUALIFIERS.join(
first, second, third, Separator.EMPTY_BYTES);
}
byte[] entityType = Separator.encode(rowKey.getEntityType(),
Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
byte[] entityId = rowKey.getEntityId() == null ? Separator.EMPTY_BYTES :
Separator.encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB,
Separator.QUALIFIERS);
byte[] fourth = Separator.QUALIFIERS.join(entityType, entityId);
return Separator.QUALIFIERS.join(first, second, third, fourth);
}
/*
* (non-Javadoc)
*
* Decodes an application row key of the form
* userName!clusterId!flowName!flowRunId!appId!entityType!entityId represented
* in byte format and converts it into an EntityRowKey object. flowRunId is
* inverted while decoding as it was inverted while encoding.
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #decode(byte[])
*/
@Override
public EntityRowKey decode(byte[] rowKey) {
byte[][] rowKeyComponents =
Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
if (rowKeyComponents.length != 7) {
throw new IllegalArgumentException("the row key is not valid for " +
"an entity");
}
String userId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[1]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
Long flowRunId =
TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]);
String entityType = Separator.decode(Bytes.toString(rowKeyComponents[5]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
String entityId =Separator.decode(Bytes.toString(rowKeyComponents[6]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
entityType, entityId);
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
@ -51,7 +52,6 @@ public enum FlowActivityColumnPrefix
*/
private final String columnPrefix;
private final byte[] columnPrefixBytes;
private final boolean compoundColQual;
private final AggregationOperation aggOp;
@ -83,7 +83,6 @@ public enum FlowActivityColumnPrefix
.encode(columnPrefix));
}
this.aggOp = aggOp;
this.compoundColQual = compoundColQual;
}
/**
@ -169,10 +168,12 @@ public enum FlowActivityColumnPrefix
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResults(org.apache.hadoop.hbase.client.Result)
* #readResults(org.apache.hadoop.hbase.client.Result,
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
*/
public Map<String, Object> readResults(Result result) throws IOException {
return column.readResults(result, columnPrefixBytes);
public <K> Map<K, Object> readResults(Result result,
KeyConverter<K> keyConverter) throws IOException {
return column.readResults(result, columnPrefixBytes, keyConverter);
}
/*
@ -180,11 +181,14 @@ public enum FlowActivityColumnPrefix
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
* #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
*/
public <T> NavigableMap<String, NavigableMap<Long, T>>
readResultsWithTimestamps(Result result) throws IOException {
return column.readResultsWithTimestamps(result, columnPrefixBytes);
public <K, V> NavigableMap<K, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
throws IOException {
return column.readResultsWithTimestamps(result, columnPrefixBytes,
keyConverter);
}
/**
@ -270,20 +274,4 @@ public enum FlowActivityColumnPrefix
column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
combinedAttributes);
}
@Override
public byte[] getCompoundColQualBytes(String qualifier,
byte[]...components) {
if (!compoundColQual) {
return ColumnHelper.getColumnQualifier(null, qualifier);
}
return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
}
@Override
public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
throws IOException {
// There are no compound column qualifiers for flow activity table.
return null;
}
}

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
@ -27,11 +25,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStor
public class FlowActivityRowKey {
private final String clusterId;
private final long dayTs;
private final Long dayTs;
private final String userId;
private final String flowName;
public FlowActivityRowKey(String clusterId, long dayTs, String userId,
public FlowActivityRowKey(String clusterId, Long dayTs, String userId,
String flowName) {
this.clusterId = clusterId;
this.dayTs = dayTs;
@ -43,7 +41,7 @@ public class FlowActivityRowKey {
return clusterId;
}
public long getDayTimestamp() {
public Long getDayTimestamp() {
return dayTs;
}
@ -63,7 +61,8 @@ public class FlowActivityRowKey {
* @return byte array with the row key prefix
*/
public static byte[] getRowKeyPrefix(String clusterId) {
return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, ""));
return FlowActivityRowKeyConverter.getInstance().encode(
new FlowActivityRowKey(clusterId, null, null, null));
}
/**
@ -75,9 +74,8 @@ public class FlowActivityRowKey {
* @return byte array with the row key prefix
*/
public static byte[] getRowKeyPrefix(String clusterId, long dayTs) {
return Separator.QUALIFIERS.join(
Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
Bytes.toBytes(TimelineStorageUtils.invertLong(dayTs)), new byte[0]);
return FlowActivityRowKeyConverter.getInstance().encode(
new FlowActivityRowKey(clusterId, dayTs, null, null));
}
/**
@ -94,12 +92,8 @@ public class FlowActivityRowKey {
String flowName) {
// convert it to Day's time stamp
eventTs = TimelineStorageUtils.getTopOfTheDayTimestamp(eventTs);
return Separator.QUALIFIERS.join(
Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
Bytes.toBytes(TimelineStorageUtils.invertLong(eventTs)),
Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
Bytes.toBytes(Separator.QUALIFIERS.encode(flowName)));
return FlowActivityRowKeyConverter.getInstance().encode(
new FlowActivityRowKey(clusterId, eventTs, userId, flowName));
}
/**
@ -109,21 +103,6 @@ public class FlowActivityRowKey {
* @return A <cite>FlowActivityRowKey</cite> object.
*/
public static FlowActivityRowKey parseRowKey(byte[] rowKey) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
if (rowKeyComponents.length < 4) {
throw new IllegalArgumentException("the row key is not valid for "
+ "a flow activity");
}
String clusterId = Separator.QUALIFIERS.decode(Bytes
.toString(rowKeyComponents[0]));
long dayTs =
TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1]));
String userId = Separator.QUALIFIERS.decode(Bytes
.toString(rowKeyComponents[2]));
String flowName = Separator.QUALIFIERS.decode(Bytes
.toString(rowKeyComponents[3]));
return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
return FlowActivityRowKeyConverter.getInstance().decode(rowKey);
}
}

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
* Encodes and decodes row key for flow activity table.
* The row key is of the form : clusterId!dayTimestamp!user!flowName.
* dayTimestamp(top of the day timestamp) is a long and rest are strings.
*/
public final class FlowActivityRowKeyConverter implements
KeyConverter<FlowActivityRowKey> {
private static final FlowActivityRowKeyConverter INSTANCE =
new FlowActivityRowKeyConverter();
public static FlowActivityRowKeyConverter getInstance() {
return INSTANCE;
}
private FlowActivityRowKeyConverter() {
}
// Flow activity row key is of the form clusterId!dayTimestamp!user!flowName
// with each segment separated by !. The sizes below indicate sizes of each
// one of these segements in sequence. clusterId, user and flowName are
// strings. Top of the day timestamp is a long hence 8 bytes in size.
// Strings are variable in size (i.e. end whenever separator is encountered).
// This is used while decoding and helps in determining where to split.
private static final int[] SEGMENT_SIZES = {
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE };
/*
* (non-Javadoc)
*
* Encodes FlowActivityRowKey object into a byte array with each
* component/field in FlowActivityRowKey separated by Separator#QUALIFIERS.
* This leads to an flow activity table row key of the form
* clusterId!dayTimestamp!user!flowName
* If dayTimestamp in passed FlowActivityRowKey object is null and clusterId
* is not null, this returns a row key prefix as clusterId! and if userId in
* FlowActivityRowKey is null (and the fields preceding it i.e. clusterId and
* dayTimestamp are not null), this returns a row key prefix as
* clusterId!dayTimeStamp!
* dayTimestamp is inverted while encoding as it helps maintain a descending
* order for row keys in flow activity table.
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #encode(java.lang.Object)
*/
@Override
public byte[] encode(FlowActivityRowKey rowKey) {
if (rowKey.getDayTimestamp() == null) {
return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
Separator.SPACE, Separator.TAB, Separator.QUALIFIERS),
Separator.EMPTY_BYTES);
}
if (rowKey.getUserId() == null) {
return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
Separator.SPACE, Separator.TAB, Separator.QUALIFIERS),
Bytes.toBytes(TimelineStorageUtils.invertLong(
rowKey.getDayTimestamp())), Separator.EMPTY_BYTES);
}
return Separator.QUALIFIERS.join(
Separator.encode(rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
Separator.QUALIFIERS),
Bytes.toBytes(
TimelineStorageUtils.invertLong(rowKey.getDayTimestamp())),
Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
Separator.QUALIFIERS),
Separator.encode(rowKey.getFlowName(), Separator.SPACE, Separator.TAB,
Separator.QUALIFIERS));
}
@Override
public FlowActivityRowKey decode(byte[] rowKey) {
byte[][] rowKeyComponents =
Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
if (rowKeyComponents.length != 4) {
throw new IllegalArgumentException("the row key is not valid for "
+ "a flow activity");
}
String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
Long dayTs =
TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1]));
String userId = Separator.decode(Bytes.toString(rowKeyComponents[2]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
String flowName = Separator.decode(Bytes.toString(rowKeyComponents[3]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
}
}

View File

@ -26,10 +26,11 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
/**
@ -40,8 +41,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
/**
* To store flow run info values.
*/
METRIC(FlowRunColumnFamily.INFO, "m", null,
LongConverter.getInstance());
METRIC(FlowRunColumnFamily.INFO, "m", null, LongConverter.getInstance());
private final ColumnHelper<FlowRunTable> column;
private final ColumnFamily<FlowRunTable> columnFamily;
@ -52,17 +52,14 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
*/
private final String columnPrefix;
private final byte[] columnPrefixBytes;
private final boolean compoundColQual;
private final AggregationOperation aggOp;
/**
* Private constructor, meant to be used by the enum definition.
*
* @param columnFamily
* that this column is stored in.
* @param columnPrefix
* for this column.
* @param columnFamily that this column is stored in.
* @param columnPrefix for this column.
*/
private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
String columnPrefix, AggregationOperation fra, ValueConverter converter) {
@ -79,11 +76,10 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
this.columnPrefixBytes = null;
} else {
// Future-proof by ensuring the right column prefix hygiene.
this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE
.encode(columnPrefix));
this.columnPrefixBytes =
Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
}
this.aggOp = fra;
this.compoundColQual = compoundColQual;
}
/**
@ -99,14 +95,14 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
@Override
public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
return ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifierPrefix);
return ColumnHelper.getColumnQualifier(this.columnPrefixBytes,
qualifierPrefix);
}
@Override
public byte[] getColumnPrefixBytes(String qualifierPrefix) {
return ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifierPrefix);
return ColumnHelper.getColumnQualifier(this.columnPrefixBytes,
qualifierPrefix);
}
@Override
@ -139,8 +135,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
}
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, this.aggOp);
Attribute[] combinedAttributes =
TimelineStorageUtils.combineAttributes(attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
}
@ -166,8 +162,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
}
byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, this.aggOp);
Attribute[] combinedAttributes =
TimelineStorageUtils.combineAttributes(attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
}
@ -180,8 +176,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
* #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
*/
public Object readResult(Result result, String qualifier) throws IOException {
byte[] columnQualifier = ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifier);
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
return column.readResult(result, columnQualifier);
}
@ -190,10 +186,12 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResults(org.apache.hadoop.hbase.client.Result)
* #readResults(org.apache.hadoop.hbase.client.Result,
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
*/
public Map<String, Object> readResults(Result result) throws IOException {
return column.readResults(result, columnPrefixBytes);
public <K> Map<K, Object> readResults(Result result,
KeyConverter<K> keyConverter) throws IOException {
return column.readResults(result, columnPrefixBytes, keyConverter);
}
/*
@ -201,11 +199,14 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
* #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
*/
public <T> NavigableMap<String, NavigableMap<Long, T>>
readResultsWithTimestamps(Result result) throws IOException {
return column.readResultsWithTimestamps(result, columnPrefixBytes);
public <K, V> NavigableMap<K, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
throws IOException {
return column.readResultsWithTimestamps(result, columnPrefixBytes,
keyConverter);
}
/**
@ -213,8 +214,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
* no match. The following holds true: {@code columnFor(x) == columnFor(y)} if
* and only if {@code x.equals(y)} or {@code (x == y == null)}
*
* @param columnPrefix
* Name of the column to retrieve
* @param columnPrefix Name of the column to retrieve
* @return the corresponding {@link FlowRunColumnPrefix} or null
*/
public static final FlowRunColumnPrefix columnFor(String columnPrefix) {
@ -242,10 +242,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
* {@code columnFor(a,x) == columnFor(b,y)} if and only if
* {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
*
* @param columnFamily
* The columnFamily for which to retrieve the column.
* @param columnPrefix
* Name of the column to retrieve
* @param columnFamily The columnFamily for which to retrieve the column.
* @param columnPrefix Name of the column to retrieve
* @return the corresponding {@link FlowRunColumnPrefix} or null if both
* arguments don't match.
*/
@ -267,20 +265,4 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
// Default to null
return null;
}
@Override
public byte[] getCompoundColQualBytes(String qualifier,
byte[]...components) {
if (!compoundColQual) {
return ColumnHelper.getColumnQualifier(null, qualifier);
}
return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
}
@Override
public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
throws IOException {
// There are no compound column qualifiers for flow run table.
return null;
}
}

View File

@ -17,10 +17,6 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
* Represents a rowkey for the flow run table.
*/
@ -28,10 +24,10 @@ public class FlowRunRowKey {
private final String clusterId;
private final String userId;
private final String flowName;
private final long flowRunId;
private final Long flowRunId;
public FlowRunRowKey(String clusterId, String userId, String flowName,
long flowRunId) {
Long flowRunId) {
this.clusterId = clusterId;
this.userId = userId;
this.flowName = flowName;
@ -50,7 +46,7 @@ public class FlowRunRowKey {
return flowName;
}
public long getFlowRunId() {
public Long getFlowRunId() {
return flowRunId;
}
@ -65,13 +61,13 @@ public class FlowRunRowKey {
*/
public static byte[] getRowKeyPrefix(String clusterId, String userId,
String flowName) {
return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId,
flowName, ""));
return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey(
clusterId, userId, flowName, null));
}
/**
* Constructs a row key for the entity table as follows: {
* clusterId!userI!flowName!Inverted Flow Run Id}.
* clusterId!userId!flowName!Inverted Flow Run Id}.
*
* @param clusterId Cluster Id.
* @param userId User Id.
@ -81,12 +77,8 @@ public class FlowRunRowKey {
*/
public static byte[] getRowKey(String clusterId, String userId,
String flowName, Long flowRunId) {
byte[] first = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId,
userId, flowName));
// Note that flowRunId is a long, so we can't encode them all at the same
// time.
byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
return Separator.QUALIFIERS.join(first, second);
return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey(
clusterId, userId, flowName, flowRunId));
}
/**
@ -96,22 +88,7 @@ public class FlowRunRowKey {
* @return A <cite>FlowRunRowKey</cite> object.
*/
public static FlowRunRowKey parseRowKey(byte[] rowKey) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
if (rowKeyComponents.length < 4) {
throw new IllegalArgumentException("the row key is not valid for " +
"a flow run");
}
String clusterId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
String userId =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
String flowName =
Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
long flowRunId =
TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
return FlowRunRowKeyConverter.getInstance().decode(rowKey);
}
/**

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
/**
* Encodes and decodes row key for flow run table.
* The row key is of the form : clusterId!userId!flowName!flowrunId.
* flowrunId is a long and rest are strings.
*/
public final class FlowRunRowKeyConverter implements
KeyConverter<FlowRunRowKey> {
private static final FlowRunRowKeyConverter INSTANCE =
new FlowRunRowKeyConverter();
public static FlowRunRowKeyConverter getInstance() {
return INSTANCE;
}
private FlowRunRowKeyConverter() {
}
// Flow run row key is of the form
// clusterId!userId!flowName!flowrunId with each segment separated by !.
// The sizes below indicate sizes of each one of these segments in sequence.
// clusterId, userId and flowName are strings. flowrunId is a long hence 8
// bytes in size. Strings are variable in size (i.e. end whenever separator is
// encountered). This is used while decoding and helps in determining where to
// split.
private static final int[] SEGMENT_SIZES = {
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Bytes.SIZEOF_LONG };
/*
* (non-Javadoc)
*
* Encodes FlowRunRowKey object into a byte array with each component/field in
* FlowRunRowKey separated by Separator#QUALIFIERS. This leads to an
* flow run row key of the form clusterId!userId!flowName!flowrunId
* If flowRunId in passed FlowRunRowKey object is null (and the fields
* preceding it i.e. clusterId, userId and flowName are not null), this
* returns a row key prefix of the form clusterId!userName!flowName!
* flowRunId is inverted while encoding as it helps maintain a descending
* order for flow keys in flow run table.
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #encode(java.lang.Object)
*/
@Override
public byte[] encode(FlowRunRowKey rowKey) {
byte[] first = Separator.QUALIFIERS.join(
Separator.encode(rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
Separator.QUALIFIERS),
Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
Separator.QUALIFIERS),
Separator.encode(rowKey.getFlowName(), Separator.SPACE, Separator.TAB,
Separator.QUALIFIERS));
if (rowKey.getFlowRunId() == null) {
return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
} else {
// Note that flowRunId is a long, so we can't encode them all at the same
// time.
byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(
rowKey.getFlowRunId()));
return Separator.QUALIFIERS.join(first, second);
}
}
/*
* (non-Javadoc)
*
* Decodes an flow run row key of the form
* clusterId!userId!flowName!flowrunId represented in byte format and converts
* it into an FlowRunRowKey object. flowRunId is inverted while decoding as
* it was inverted while encoding.
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
* #decode(byte[])
*/
@Override
public FlowRunRowKey decode(byte[] rowKey) {
byte[][] rowKeyComponents =
Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
if (rowKeyComponents.length != 4) {
throw new IllegalArgumentException("the row key is not valid for " +
"a flow run");
}
String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
String userId = Separator.decode(Bytes.toString(rowKeyComponents[1]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
Long flowRunId =
TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
}
}

View File

@ -44,9 +44,10 @@ import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
import com.google.common.annotations.VisibleForTesting;
@ -193,7 +194,7 @@ class FlowScanner implements RegionScanner, Closeable {
// So all cells in one qualifier come one after the other before we see the
// next column qualifier
ByteArrayComparator comp = new ByteArrayComparator();
byte[] currentColumnQualifier = TimelineStorageUtils.EMPTY_BYTES;
byte[] currentColumnQualifier = Separator.EMPTY_BYTES;
AggregationOperation currentAggOp = null;
SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
Set<String> alreadySeenAggDim = new HashSet<>();
@ -314,7 +315,7 @@ class FlowScanner implements RegionScanner, Closeable {
+ " cell qualifier="
+ Bytes.toString(CellUtil.cloneQualifier(cell))
+ " cell value= "
+ (Number) converter.decodeValue(CellUtil.cloneValue(cell))
+ converter.decodeValue(CellUtil.cloneValue(cell))
+ " timestamp=" + cell.getTimestamp());
}
@ -480,7 +481,7 @@ class FlowScanner implements RegionScanner, Closeable {
LOG.trace("MAJOR COMPACTION loop sum= " + sum
+ " discarding now: " + " qualifier="
+ Bytes.toString(CellUtil.cloneQualifier(cell)) + " value="
+ (Number) converter.decodeValue(CellUtil.cloneValue(cell))
+ converter.decodeValue(CellUtil.cloneValue(cell))
+ " timestamp=" + cell.getTimestamp() + " " + this.action);
}
} else {

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrie
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
@ -125,7 +126,7 @@ class FlowActivityEntityReader extends TimelineEntityReader {
protected TimelineEntity parseEntity(Result result) throws IOException {
FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow());
long time = rowKey.getDayTimestamp();
Long time = rowKey.getDayTimestamp();
String user = rowKey.getUserId();
String flowName = rowKey.getFlowName();
@ -135,10 +136,11 @@ class FlowActivityEntityReader extends TimelineEntityReader {
flowActivity.setId(flowActivity.getId());
// get the list of run ids along with the version that are associated with
// this flow on this day
Map<String, Object> runIdsMap =
FlowActivityColumnPrefix.RUN_ID.readResults(result);
for (Map.Entry<String, Object> e : runIdsMap.entrySet()) {
Long runId = Long.valueOf(e.getKey());
Map<Long, Object> runIdsMap =
FlowActivityColumnPrefix.RUN_ID.readResults(result,
LongKeyConverter.getInstance());
for (Map.Entry<Long, Object> e : runIdsMap.entrySet()) {
Long runId = e.getKey();
String version = (String)e.getValue();
FlowRunEntity flowRun = new FlowRunEntity();
flowRun.setUser(user);

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilter
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
/**
* The base class for reading and deserializing timeline entities from the
@ -329,7 +330,8 @@ public abstract class TimelineEntityReader {
protected void readMetrics(TimelineEntity entity, Result result,
ColumnPrefix<?> columnPrefix) throws IOException {
NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
columnPrefix.readResultsWithTimestamps(result);
columnPrefix.readResultsWithTimestamps(
result, StringKeyConverter.getInstance());
for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
metricsResult.entrySet()) {
TimelineMetric metric = new TimelineMetric();

View File

@ -0,0 +1,293 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyConverter;
import org.junit.Test;
public class TestKeyConverters {
private final static String QUALIFIER_SEP = Separator.QUALIFIERS.getValue();
private final static byte[] QUALIFIER_SEP_BYTES =
Bytes.toBytes(QUALIFIER_SEP);
private final static String CLUSTER = "cl" + QUALIFIER_SEP + "uster";
private final static String USER = QUALIFIER_SEP + "user";
private final static String FLOW_NAME =
"dummy_" + QUALIFIER_SEP + "flow" + QUALIFIER_SEP;
private final static Long FLOW_RUN_ID;
private final static String APPLICATION_ID;
static {
long runid = Long.MAX_VALUE - 900L;
byte[] longMaxByteArr = Bytes.toBytes(Long.MAX_VALUE);
byte[] byteArr = Bytes.toBytes(runid);
int sepByteLen = QUALIFIER_SEP_BYTES.length;
if (sepByteLen <= byteArr.length) {
for (int i = 0; i < sepByteLen; i++) {
byteArr[i] = (byte)(longMaxByteArr[i] - QUALIFIER_SEP_BYTES[i]);
}
}
FLOW_RUN_ID = Bytes.toLong(byteArr);
long clusterTs = System.currentTimeMillis();
byteArr = Bytes.toBytes(clusterTs);
if (sepByteLen <= byteArr.length) {
for (int i = 0; i < sepByteLen; i++) {
byteArr[byteArr.length - sepByteLen + i] =
(byte)(longMaxByteArr[byteArr.length - sepByteLen + i] -
QUALIFIER_SEP_BYTES[i]);
}
}
clusterTs = Bytes.toLong(byteArr);
int seqId = 222;
APPLICATION_ID = ApplicationId.newInstance(clusterTs, seqId).toString();
}
private static void verifyRowPrefixBytes(byte[] byteRowKeyPrefix) {
int sepLen = QUALIFIER_SEP_BYTES.length;
for (int i = 0; i < sepLen; i++) {
assertTrue("Row key prefix not encoded properly.",
byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen + i] ==
QUALIFIER_SEP_BYTES[i]);
}
}
@Test
public void testFlowActivityRowKeyConverter() {
Long ts = TimelineStorageUtils.getTopOfTheDayTimestamp(1459900830000L);
byte[] byteRowKey = FlowActivityRowKeyConverter.getInstance().encode(
new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME));
FlowActivityRowKey rowKey =
FlowActivityRowKeyConverter.getInstance().decode(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(ts, rowKey.getDayTimestamp());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
byte[] byteRowKeyPrefix = FlowActivityRowKeyConverter.getInstance().encode(
new FlowActivityRowKey(CLUSTER, null, null, null));
byte[][] splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
assertEquals(2, splits.length);
assertEquals(0, splits[1].length);
assertEquals(CLUSTER,
Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
verifyRowPrefixBytes(byteRowKeyPrefix);
byteRowKeyPrefix = FlowActivityRowKeyConverter.getInstance().encode(
new FlowActivityRowKey(CLUSTER, ts, null, null));
splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE });
assertEquals(3, splits.length);
assertEquals(0, splits[2].length);
assertEquals(CLUSTER,
Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
assertEquals(ts, (Long) TimelineStorageUtils.invertLong(
Bytes.toLong(splits[1])));
verifyRowPrefixBytes(byteRowKeyPrefix);
}
@Test
public void testFlowRunRowKeyConverter() {
byte[] byteRowKey = FlowRunRowKeyConverter.getInstance().encode(
new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID));
FlowRunRowKey rowKey =
FlowRunRowKeyConverter.getInstance().decode(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
byte[] byteRowKeyPrefix = FlowRunRowKeyConverter.getInstance().encode(
new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, null));
byte[][] splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
assertEquals(4, splits.length);
assertEquals(0, splits[3].length);
assertEquals(FLOW_NAME,
Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
verifyRowPrefixBytes(byteRowKeyPrefix);
}
@Test
public void testApplicationRowKeyConverter() {
byte[] byteRowKey = ApplicationRowKeyConverter.getInstance().encode(
new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
APPLICATION_ID));
ApplicationRowKey rowKey =
ApplicationRowKeyConverter.getInstance().decode(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
assertEquals(APPLICATION_ID, rowKey.getAppId());
byte[] byteRowKeyPrefix = ApplicationRowKeyConverter.getInstance().encode(
new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, null));
byte[][] splits =
Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
Separator.VARIABLE_SIZE });
assertEquals(5, splits.length);
assertEquals(0, splits[4].length);
assertEquals(FLOW_NAME,
Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
assertEquals(FLOW_RUN_ID, (Long)TimelineStorageUtils.invertLong(
Bytes.toLong(splits[3])));
verifyRowPrefixBytes(byteRowKeyPrefix);
byteRowKeyPrefix = ApplicationRowKeyConverter.getInstance().encode(
new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, null, null));
splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
assertEquals(4, splits.length);
assertEquals(0, splits[3].length);
assertEquals(FLOW_NAME,
Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
verifyRowPrefixBytes(byteRowKeyPrefix);
}
@Test
public void testEntityRowKeyConverter() {
String entityId = "!ent!ity!!id!";
String entityType = "entity!Type";
byte[] byteRowKey = EntityRowKeyConverter.getInstance().encode(
new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
entityType, entityId));
EntityRowKey rowKey =
EntityRowKeyConverter.getInstance().decode(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
assertEquals(APPLICATION_ID, rowKey.getAppId());
assertEquals(entityType, rowKey.getEntityType());
assertEquals(entityId, rowKey.getEntityId());
byte[] byteRowKeyPrefix = EntityRowKeyConverter.getInstance().encode(
new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
entityType, null));
byte[][] splits =
Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE });
assertEquals(7, splits.length);
assertEquals(0, splits[6].length);
assertEquals(APPLICATION_ID,
AppIdKeyConverter.getInstance().decode(splits[4]));
assertEquals(entityType, Separator.QUALIFIERS.decode(
Bytes.toString(splits[5])));
verifyRowPrefixBytes(byteRowKeyPrefix);
byteRowKeyPrefix = EntityRowKeyConverter.getInstance().encode(
new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
null, null));
splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE });
assertEquals(6, splits.length);
assertEquals(0, splits[5].length);
assertEquals(APPLICATION_ID,
AppIdKeyConverter.getInstance().decode(splits[4]));
verifyRowPrefixBytes(byteRowKeyPrefix);
}
@Test
public void testAppToFlowRowKeyConverter() {
byte[] byteRowKey = AppToFlowRowKeyConverter.getInstance().encode(
new AppToFlowRowKey(CLUSTER, APPLICATION_ID));
AppToFlowRowKey rowKey =
AppToFlowRowKeyConverter.getInstance().decode(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(APPLICATION_ID, rowKey.getAppId());
}
@Test
public void testAppIdKeyConverter() {
long currentTs = System.currentTimeMillis();
ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1);
ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2);
ApplicationId appId3 = ApplicationId.newInstance(currentTs + 300, 1);
String appIdStr1 = appId1.toString();
String appIdStr2 = appId2.toString();
String appIdStr3 = appId3.toString();
byte[] appIdBytes1 = AppIdKeyConverter.getInstance().encode(appIdStr1);
byte[] appIdBytes2 = AppIdKeyConverter.getInstance().encode(appIdStr2);
byte[] appIdBytes3 = AppIdKeyConverter.getInstance().encode(appIdStr3);
// App ids' should be encoded in a manner wherein descending order
// is maintained.
assertTrue("Ordering of app ids' is incorrect",
Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 &&
Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 &&
Bytes.compareTo(appIdBytes2, appIdBytes3) > 0);
String decodedAppId1 = AppIdKeyConverter.getInstance().decode(appIdBytes1);
String decodedAppId2 = AppIdKeyConverter.getInstance().decode(appIdBytes2);
String decodedAppId3 = AppIdKeyConverter.getInstance().decode(appIdBytes3);
assertTrue("Decoded app id is not same as the app id encoded",
appIdStr1.equals(decodedAppId1));
assertTrue("Decoded app id is not same as the app id encoded",
appIdStr2.equals(decodedAppId2));
assertTrue("Decoded app id is not same as the app id encoded",
appIdStr3.equals(decodedAppId3));
}
@Test
public void testEventColumnNameConverter() {
String eventId = "=foo_=eve=nt=";
byte[] valSepBytes = Bytes.toBytes(Separator.VALUES.getValue());
byte[] maxByteArr =
Bytes.createMaxByteArray(Bytes.SIZEOF_LONG - valSepBytes.length);
byte[] ts = Bytes.add(valSepBytes, maxByteArr);
Long eventTs = Bytes.toLong(ts);
byte[] byteEventColName = EventColumnNameConverter.getInstance().encode(
new EventColumnName(eventId, eventTs, null));
EventColumnName eventColName =
EventColumnNameConverter.getInstance().decode(byteEventColName);
assertEquals(eventId, eventColName.getId());
assertEquals(eventTs, eventColName.getTimestamp());
assertNull(eventColName.getInfoKey());
String infoKey = "f=oo_event_in=fo=_key";
byteEventColName = EventColumnNameConverter.getInstance().encode(
new EventColumnName(eventId, eventTs, infoKey));
eventColName =
EventColumnNameConverter.getInstance().decode(byteEventColName);
assertEquals(eventId, eventColName.getId());
assertEquals(eventTs, eventColName.getTimestamp());
assertEquals(infoKey, eventColName.getInfoKey());
}
}

View File

@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import com.google.common.collect.Iterables;
@ -32,7 +34,7 @@ public class TestSeparator {
private static String villain = "Dr. Heinz Doofenshmirtz";
private static String special =
". * | ? + ( ) [ ] { } ^ $ \\ \"";
". * | ? + \t ( ) [ ] { } ^ $ \\ \"";
/**
*
@ -47,6 +49,7 @@ public class TestSeparator {
testEncodeDecode(separator, "?");
testEncodeDecode(separator, "&");
testEncodeDecode(separator, "+");
testEncodeDecode(separator, "\t");
testEncodeDecode(separator, "Dr.");
testEncodeDecode(separator, "Heinz");
testEncodeDecode(separator, "Doofenshmirtz");
@ -79,6 +82,83 @@ public class TestSeparator {
}
@Test
public void testSplits() {
byte[] maxLongBytes = Bytes.toBytes(Long.MAX_VALUE);
byte[] maxIntBytes = Bytes.toBytes(Integer.MAX_VALUE);
for (Separator separator : Separator.values()) {
String str1 = "cl" + separator.getValue() + "us";
String str2 = separator.getValue() + "rst";
byte[] sepByteArr = Bytes.toBytes(separator.getValue());
byte[] longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes,
sepByteArr.length, Bytes.SIZEOF_LONG - sepByteArr.length));
byte[] intVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxIntBytes,
sepByteArr.length, Bytes.SIZEOF_INT - sepByteArr.length));
byte[] arr = separator.join(
Bytes.toBytes(separator.encode(str1)),longVal1Arr,
Bytes.toBytes(separator.encode(str2)), intVal1Arr);
int[] sizes = { Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
Separator.VARIABLE_SIZE, Bytes.SIZEOF_INT };
byte[][] splits = separator.split(arr, sizes);
assertEquals(4, splits.length);
assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
longVal1Arr = Bytes.add(Bytes.copy(maxLongBytes, 0, Bytes.SIZEOF_LONG -
sepByteArr.length), sepByteArr);
intVal1Arr = Bytes.add(Bytes.copy(maxIntBytes, 0, Bytes.SIZEOF_INT -
sepByteArr.length), sepByteArr);
arr = separator.join(Bytes.toBytes(separator.encode(str1)),longVal1Arr,
Bytes.toBytes(separator.encode(str2)), intVal1Arr);
splits = separator.split(arr, sizes);
assertEquals(4, splits.length);
assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes,
sepByteArr.length, 4 - sepByteArr.length), sepByteArr);
longVal1Arr = Bytes.add(longVal1Arr, Bytes.copy(maxLongBytes, 4, 3 -
sepByteArr.length), sepByteArr);
arr = separator.join(Bytes.toBytes(separator.encode(str1)),longVal1Arr,
Bytes.toBytes(separator.encode(str2)), intVal1Arr);
splits = separator.split(arr, sizes);
assertEquals(4, splits.length);
assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
arr = separator.join(Bytes.toBytes(separator.encode(str1)),
Bytes.toBytes(separator.encode(str2)), intVal1Arr, longVal1Arr);
int[] sizes1 = { Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Bytes.SIZEOF_INT, Bytes.SIZEOF_LONG };
splits = separator.split(arr, sizes1);
assertEquals(4, splits.length);
assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
assertEquals(str2, separator.decode(Bytes.toString(splits[1])));
assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[2]));
assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[3]));
try {
int[] sizes2 = { Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Bytes.SIZEOF_INT, 7 };
splits = separator.split(arr, sizes2);
fail("Exception should have been thrown.");
} catch (IllegalArgumentException e) {}
try {
int[] sizes2 = { Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, 2,
Bytes.SIZEOF_LONG };
splits = separator.split(arr, sizes2);
fail("Exception should have been thrown.");
} catch (IllegalArgumentException e) {}
}
}
/**
* Simple test to encode and decode using the same separators and confirm that
* we end up with the same as what we started with.

View File

@ -1,56 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.junit.Test;
public class TestTimelineStorageUtils {
@Test
public void testEncodeDecodeAppId() {
long currentTs = System.currentTimeMillis();
ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1);
ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2);
ApplicationId appId3 = ApplicationId.newInstance(currentTs + 300, 1);
String appIdStr1 = appId1.toString();
String appIdStr2 = appId2.toString();
String appIdStr3 = appId3.toString();
byte[] appIdBytes1 = TimelineStorageUtils.encodeAppId(appIdStr1);
byte[] appIdBytes2 = TimelineStorageUtils.encodeAppId(appIdStr2);
byte[] appIdBytes3 = TimelineStorageUtils.encodeAppId(appIdStr3);
// App ids' should be encoded in a manner wherein descending order
// is maintained.
assertTrue("Ordering of app ids' is incorrect",
Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 &&
Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 &&
Bytes.compareTo(appIdBytes2, appIdBytes3) > 0);
String decodedAppId1 = TimelineStorageUtils.decodeAppId(appIdBytes1);
String decodedAppId2 = TimelineStorageUtils.decodeAppId(appIdBytes2);
String decodedAppId3 = TimelineStorageUtils.decodeAppId(appIdBytes3);
assertTrue("Decoded app id is not same as the app id encoded",
appIdStr1.equals(decodedAppId1));
assertTrue("Decoded app id is not same as the app id encoded",
appIdStr2.equals(decodedAppId2));
assertTrue("Decoded app id is not same as the app id encoded",
appIdStr3.equals(decodedAppId3));
}
}