YARN-4025. Deal with byte representations of Longs in writer code. Contributed by Sangjin Lee and Vrushali C.

This commit is contained in:
Junping Du 2015-08-19 10:00:33 -07:00 committed by Sangjin Lee
parent 477a30f536
commit 7a41b5501e
10 changed files with 370 additions and 141 deletions

View File

@ -19,12 +19,9 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.NavigableSet; import java.util.NavigableSet;
@ -431,34 +428,36 @@ public class HBaseTimelineReaderImpl
Map<String, Object> columns = prefix.readResults(result); Map<String, Object> columns = prefix.readResults(result);
if (isConfig) { if (isConfig) {
for (Map.Entry<String, Object> column : columns.entrySet()) { for (Map.Entry<String, Object> column : columns.entrySet()) {
entity.addConfig(column.getKey(), column.getKey().toString()); entity.addConfig(column.getKey(), column.getValue().toString());
} }
} else { } else {
entity.addInfo(columns); entity.addInfo(columns);
} }
} }
/**
* Read events from the entity table or the application table. The column name
* is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
* if there is no info associated with the event.
*
* See {@link EntityTable} and {@link ApplicationTable} for a more detailed
* schema description.
*/
private static void readEvents(TimelineEntity entity, Result result, private static void readEvents(TimelineEntity entity, Result result,
boolean isApplication) throws IOException { boolean isApplication) throws IOException {
Map<String, TimelineEvent> eventsMap = new HashMap<>(); Map<String, TimelineEvent> eventsMap = new HashMap<>();
Map<String, Object> eventsResult = isApplication ? Map<?, Object> eventsResult = isApplication ?
ApplicationColumnPrefix.EVENT.readResults(result) : ApplicationColumnPrefix.EVENT.
EntityColumnPrefix.EVENT.readResults(result); readResultsHavingCompoundColumnQualifiers(result) :
for (Map.Entry<String,Object> eventResult : eventsResult.entrySet()) { EntityColumnPrefix.EVENT.
Collection<String> tokens = readResultsHavingCompoundColumnQualifiers(result);
Separator.VALUES.splitEncoded(eventResult.getKey()); for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
if (tokens.size() != 2 && tokens.size() != 3) { byte[][] karr = (byte[][])eventResult.getKey();
throw new IOException( // the column name is of the form "eventId=timestamp=infoKey"
"Invalid event column name: " + eventResult.getKey()); if (karr.length == 3) {
} String id = Bytes.toString(karr[0]);
Iterator<String> idItr = tokens.iterator(); long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1]));
String id = idItr.next(); String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
String tsStr = idItr.next();
// TODO: timestamp is not correct via ser/des through UTF-8 string
Long ts =
TimelineWriterUtils.invert(Bytes.toLong(tsStr.getBytes(
StandardCharsets.UTF_8)));
String key = Separator.VALUES.joinEncoded(id, ts.toString());
TimelineEvent event = eventsMap.get(key); TimelineEvent event = eventsMap.get(key);
if (event == null) { if (event == null) {
event = new TimelineEvent(); event = new TimelineEvent();
@ -466,10 +465,15 @@ public class HBaseTimelineReaderImpl
event.setTimestamp(ts); event.setTimestamp(ts);
eventsMap.put(key, event); eventsMap.put(key, event);
} }
if (tokens.size() == 3) { // handle empty info
String infoKey = idItr.next(); String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
if (infoKey != null) {
event.addInfo(infoKey, eventResult.getValue()); event.addInfo(infoKey, eventResult.getValue());
} }
} else {
LOG.warn("incorrectly formatted column name: it will be discarded");
continue;
}
} }
Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values()); Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
entity.addEvents(eventsSet); entity.addEvents(eventsSet);

View File

@ -300,25 +300,27 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
byte[] compoundColumnQualifierBytes = byte[] compoundColumnQualifierBytes =
Separator.VALUES.join(columnQualifierWithTsBytes, Separator.VALUES.join(columnQualifierWithTsBytes,
null); null);
String compoundColumnQualifier = if (isApplication) {
Bytes.toString(compoundColumnQualifierBytes); ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
compoundColumnQualifierBytes, null,
TimelineWriterUtils.EMPTY_BYTES);
} else {
EntityColumnPrefix.EVENT.store(rowKey, entityTable, EntityColumnPrefix.EVENT.store(rowKey, entityTable,
compoundColumnQualifier, null, TimelineWriterUtils.EMPTY_BYTES); compoundColumnQualifierBytes, null,
TimelineWriterUtils.EMPTY_BYTES);
}
} else { } else {
for (Map.Entry<String, Object> info : eventInfo.entrySet()) { for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
// eventId?infoKey // eventId?infoKey
byte[] compoundColumnQualifierBytes = byte[] compoundColumnQualifierBytes =
Separator.VALUES.join(columnQualifierWithTsBytes, Separator.VALUES.join(columnQualifierWithTsBytes,
Bytes.toBytes(info.getKey())); Bytes.toBytes(info.getKey()));
// convert back to string to avoid additional API on store.
String compoundColumnQualifier =
Bytes.toString(compoundColumnQualifierBytes);
if (isApplication) { if (isApplication) {
ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
compoundColumnQualifier, null, info.getValue()); compoundColumnQualifierBytes, null, info.getValue());
} else { } else {
EntityColumnPrefix.EVENT.store(rowKey, entityTable, EntityColumnPrefix.EVENT.store(rowKey, entityTable,
compoundColumnQualifier, null, info.getValue()); compoundColumnQualifierBytes, null, info.getValue());
} }
} // for info: eventInfo } // for info: eventInfo
} }

View File

@ -101,6 +101,31 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
return columnPrefix; return columnPrefix;
} }
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #store(byte[],
* org.apache.hadoop.yarn.server.timelineservice.storage.common.
* TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
*/
public void store(byte[] rowKey,
TypedBufferedMutator<ApplicationTable> tableMutator, byte[] qualifier,
Long timestamp, Object inputValue) throws IOException {
// Null check
if (qualifier == null) {
throw new IOException("Cannot store column with null qualifier in "
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
}
/* /*
* (non-Javadoc) * (non-Javadoc)
* *
@ -150,6 +175,21 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
return column.readResults(result, columnPrefixBytes); return column.readResults(result, columnPrefixBytes);
} }
/**
* @param result from which to read columns
* @return the latest values of columns in the column family. The column
* qualifier is returned as a list of parts, each part a byte[]. This
* is to facilitate returning byte arrays of values that were not
* Strings. If they can be treated as Strings, you should use
* {@link #readResults(Result)} instead.
* @throws IOException
*/
public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
throws IOException {
return column.readResultsHavingCompoundColumnQualifiers(result,
columnPrefixBytes);
}
/* /*
* (non-Javadoc) * (non-Javadoc)
* *

View File

@ -57,12 +57,12 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBas
* | | infoValue | metricValue1 | | * | | infoValue | metricValue1 | |
* | | | @timestamp2 | | * | | | @timestamp2 | |
* | | r!relatesToKey: | | | * | | r!relatesToKey: | | |
* | | id3?id4?id5 | | | * | | id3=id4=id5 | | |
* | | | | | * | | | | |
* | | s!isRelatedToKey: | | | * | | s!isRelatedToKey: | | |
* | | id7?id9?id6 | | | * | | id7=id9=id6 | | |
* | | | | | * | | | | |
* | | e!eventId?timestamp?infoKey: | | | * | | e!eventId=timestamp=infoKey: | | |
* | | eventInfoValue | | | * | | eventInfoValue | | |
* | | | | | * | | | | |
* | | flowVersion: | | | * | | flowVersion: | | |

View File

@ -24,6 +24,8 @@ import java.util.Map.Entry;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.TreeMap; import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
@ -37,6 +39,7 @@ import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
* @param <T> refers to the table. * @param <T> refers to the table.
*/ */
public class ColumnHelper<T> { public class ColumnHelper<T> {
private static final Log LOG = LogFactory.getLog(ColumnHelper.class);
private final ColumnFamily<T> columnFamily; private final ColumnFamily<T> columnFamily;
@ -143,6 +146,7 @@ public class ColumnHelper<T> {
.entrySet()) { .entrySet()) {
String columnName = null; String columnName = null;
if (columnPrefixBytes == null) { if (columnPrefixBytes == null) {
LOG.info("null prefix was specified; returning all columns");
// Decode the spaces we encoded in the column name. // Decode the spaces we encoded in the column name.
columnName = Separator.decode(entry.getKey(), Separator.SPACE); columnName = Separator.decode(entry.getKey(), Separator.SPACE);
} else { } else {
@ -182,31 +186,42 @@ public class ColumnHelper<T> {
* @param result from which to read columns * @param result from which to read columns
* @param columnPrefixBytes optional prefix to limit columns. If null all * @param columnPrefixBytes optional prefix to limit columns. If null all
* columns are returned. * columns are returned.
* @return the latest values of columns in the column family. * @return the latest values of columns in the column family. This assumes
* that the column name parts are all Strings by default. If the
* column name parts should be treated natively and not be converted
* back and forth from Strings, you should use
* {@link #readResultsHavingCompoundColumnQualifiers(Result, byte[])}
* instead.
* @throws IOException * @throws IOException
*/ */
public Map<String, Object> readResults(Result result, byte[] columnPrefixBytes) public Map<String, Object> readResults(Result result,
throws IOException { byte[] columnPrefixBytes) throws IOException {
Map<String, Object> results = new HashMap<String, Object>(); Map<String, Object> results = new HashMap<String, Object>();
if (result != null) { if (result != null) {
Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes); Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes);
for (Entry<byte[], byte[]> entry : columns.entrySet()) { for (Entry<byte[], byte[]> entry : columns.entrySet()) {
if (entry.getKey() != null && entry.getKey().length > 0) { byte[] columnKey = entry.getKey();
if (columnKey != null && columnKey.length > 0) {
String columnName = null; String columnName = null;
if (columnPrefixBytes == null) { if (columnPrefixBytes == null) {
LOG.info("null prefix was specified; returning all columns");
// Decode the spaces we encoded in the column name. // Decode the spaces we encoded in the column name.
columnName = Separator.decode(entry.getKey(), Separator.SPACE); columnName = Separator.decode(columnKey, Separator.SPACE);
} else { } else {
// A non-null prefix means columns are actually of the form // A non-null prefix means columns are actually of the form
// prefix!columnNameRemainder // prefix!columnNameRemainder
byte[][] columnNameParts = byte[][] columnNameParts =
Separator.QUALIFIERS.split(entry.getKey(), 2); Separator.QUALIFIERS.split(columnKey, 2);
byte[] actualColumnPrefixBytes = columnNameParts[0]; byte[] actualColumnPrefixBytes = columnNameParts[0];
if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes) if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
&& columnNameParts.length == 2) { && columnNameParts.length == 2) {
// This is the prefix that we want // This is the prefix that we want
// if the column name is a compound qualifier
// with non string datatypes, the following decode will not
// work correctly since it considers all components to be String
// invoke the readResultsHavingCompoundColumnQualifiers function
columnName = Separator.decode(columnNameParts[1]); columnName = Separator.decode(columnNameParts[1]);
} }
} }
@ -222,6 +237,56 @@ public class ColumnHelper<T> {
return results; return results;
} }
/**
* @param result from which to read columns
* @param columnPrefixBytes optional prefix to limit columns. If null all
* columns are returned.
* @return the latest values of columns in the column family. If the column
* prefix is null, the column qualifier is returned as Strings. For a
* non-null column prefix bytes, the column qualifier is returned as
* a list of parts, each part a byte[]. This is to facilitate
* returning byte arrays of values that were not Strings.
* @throws IOException
*/
public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result,
byte[] columnPrefixBytes) throws IOException {
// handle the case where the column prefix is null
// it is the same as readResults() so simply delegate to that implementation
if (columnPrefixBytes == null) {
return readResults(result, null);
}
Map<byte[][], Object> results = new HashMap<byte[][], Object>();
if (result != null) {
Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes);
for (Entry<byte[], byte[]> entry : columns.entrySet()) {
byte[] columnKey = entry.getKey();
if (columnKey != null && columnKey.length > 0) {
// A non-null prefix means columns are actually of the form
// prefix!columnNameRemainder
// with a compound column qualifier, we are presuming existence of a
// prefix
byte[][] columnNameParts = Separator.QUALIFIERS.split(columnKey, 2);
if (columnNameParts.length > 0) {
byte[] actualColumnPrefixBytes = columnNameParts[0];
if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
&& columnNameParts.length == 2) {
// This is the prefix that we want
byte[][] columnQualifierParts =
Separator.VALUES.split(columnNameParts[1]);
Object value = GenericObjectMapper.read(entry.getValue());
// we return the columnQualifier in parts since we don't know
// which part is of which data type
results.put(columnQualifierParts, value);
}
}
}
} // for entry
}
return results;
}
/** /**
* @param columnPrefixBytes The byte representation for the column prefix. * @param columnPrefixBytes The byte representation for the column prefix.
* Should not contain {@link Separator#QUALIFIERS}. * Should not contain {@link Separator#QUALIFIERS}.
@ -247,4 +312,24 @@ public class ColumnHelper<T> {
return columnQualifier; return columnQualifier;
} }
/**
* @param columnPrefixBytes The byte representation for the column prefix.
* Should not contain {@link Separator#QUALIFIERS}.
* @param qualifier the byte representation for the remainder of the column.
* @return fully sanitized column qualifier that is a combination of prefix
* and qualifier. If prefix is null, the result is simply the encoded
* qualifier without any separator.
*/
public static byte[] getColumnQualifier(byte[] columnPrefixBytes,
byte[] qualifier) {
if (columnPrefixBytes == null) {
return qualifier;
}
byte[] columnQualifier =
Separator.QUALIFIERS.join(columnPrefixBytes, qualifier);
return columnQualifier;
}
} }

View File

@ -37,7 +37,7 @@ public enum Separator {
/** /**
* separator in values, and/or compound key/column qualifier fields. * separator in values, and/or compound key/column qualifier fields.
*/ */
VALUES("?", "%1$"), VALUES("=", "%1$"),
/** /**
* separator in values, often used to avoid having these in qualifiers and * separator in values, often used to avoid having these in qualifiers and
@ -299,12 +299,22 @@ public enum Separator {
* up to a maximum of count items. This will naturally produce copied byte * up to a maximum of count items. This will naturally produce copied byte
* arrays for each of the split segments. * arrays for each of the split segments.
* @param source to be split * @param source to be split
* @param limit on how many segments are supposed to be returned. Negative * @param limit on how many segments are supposed to be returned. A
* value indicates no limit on number of segments. * non-positive value indicates no limit on number of segments.
* @return source split by this separator. * @return source split by this separator.
*/ */
public byte[][] split(byte[] source, int limit) { public byte[][] split(byte[] source, int limit) {
return TimelineWriterUtils.split(source, this.bytes, limit); return TimelineWriterUtils.split(source, this.bytes, limit);
} }
/**
* Splits the source array into multiple array segments using this separator,
* as many times as splits are found. This will naturally produce copied byte
* arrays for each of the split segments.
* @param source to be split
* @return source split by this separator.
*/
public byte[][] split(byte[] source) {
return TimelineWriterUtils.split(source, this.bytes);
}
} }

View File

@ -33,6 +33,9 @@ public class TimelineWriterUtils {
/** empty bytes */ /** empty bytes */
public static final byte[] EMPTY_BYTES = new byte[0]; public static final byte[] EMPTY_BYTES = new byte[0];
/** indicator for no limits for splitting */
public static final int NO_LIMIT_SPLIT = -1;
/** /**
* Splits the source array into multiple array segments using the given * Splits the source array into multiple array segments using the given
* separator, up to a maximum of count items. This will naturally produce * separator, up to a maximum of count items. This will naturally produce
@ -45,7 +48,7 @@ public class TimelineWriterUtils {
* @return byte[] array after splitting the source * @return byte[] array after splitting the source
*/ */
public static byte[][] split(byte[] source, byte[] separator) { public static byte[][] split(byte[] source, byte[] separator) {
return split(source, separator, -1); return split(source, separator, NO_LIMIT_SPLIT);
} }
/** /**
@ -57,7 +60,7 @@ public class TimelineWriterUtils {
* *
* @param source * @param source
* @param separator * @param separator
* @param limit a negative value indicates no limit on number of segments. * @param limit a non-positive value indicates no limit on number of segments.
* @return byte[][] after splitting the input source * @return byte[][] after splitting the input source
*/ */
public static byte[][] split(byte[] source, byte[] separator, int limit) { public static byte[][] split(byte[] source, byte[] separator, int limit) {
@ -81,7 +84,7 @@ public class TimelineWriterUtils {
* separator byte array. * separator byte array.
*/ */
public static List<Range> splitRanges(byte[] source, byte[] separator) { public static List<Range> splitRanges(byte[] source, byte[] separator) {
return splitRanges(source, separator, -1); return splitRanges(source, separator, NO_LIMIT_SPLIT);
} }
/** /**

View File

@ -126,6 +126,31 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
} }
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #store(byte[],
* org.apache.hadoop.yarn.server.timelineservice.storage.common.
* TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
*/
public void store(byte[] rowKey,
TypedBufferedMutator<EntityTable> tableMutator, byte[] qualifier,
Long timestamp, Object inputValue) throws IOException {
// Null check
if (qualifier == null) {
throw new IOException("Cannot store column with null qualifier in "
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
}
/* /*
* (non-Javadoc) * (non-Javadoc)
* *
@ -150,6 +175,21 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
return column.readResults(result, columnPrefixBytes); return column.readResults(result, columnPrefixBytes);
} }
/**
* @param result from which to read columns
* @return the latest values of columns in the column family. The column
* qualifier is returned as a list of parts, each part a byte[]. This
* is to facilitate returning byte arrays of values that were not
* Strings. If they can be treated as Strings, you should use
* {@link #readResults(Result)} instead.
* @throws IOException
*/
public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
throws IOException {
return column.readResultsHavingCompoundColumnQualifiers(result,
columnPrefixBytes);
}
/* /*
* (non-Javadoc) * (non-Javadoc)
* *

View File

@ -58,12 +58,12 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBas
* | | infoValue | | | * | | infoValue | | |
* | | | | | * | | | | |
* | | r!relatesToKey: | | | * | | r!relatesToKey: | | |
* | | id3?id4?id5 | | | * | | id3=id4=id5 | | |
* | | | | | * | | | | |
* | | s!isRelatedToKey | | | * | | s!isRelatedToKey | | |
* | | id7?id9?id6 | | | * | | id7=id9=id6 | | |
* | | | | | * | | | | |
* | | e!eventId?timestamp?infoKey: | | | * | | e!eventId=timestamp=infoKey: | | |
* | | eventInfoValue | | | * | | eventInfoValue | | |
* | | | | | * | | | | |
* | | flowVersion: | | | * | | flowVersion: | | |

View File

@ -27,8 +27,8 @@ import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
@ -75,7 +76,7 @@ import org.junit.Test;
* even if other records exist in the table. Use a different cluster name if * even if other records exist in the table. Use a different cluster name if
* you add a new test. * you add a new test.
*/ */
public class TestHBaseTimelineWriterImpl { public class TestHBaseTimelineStorage {
private static HBaseTestingUtility util; private static HBaseTestingUtility util;
@ -101,8 +102,8 @@ public class TestHBaseTimelineWriterImpl {
ApplicationEntity entity = new ApplicationEntity(); ApplicationEntity entity = new ApplicationEntity();
String id = "hello"; String id = "hello";
entity.setId(id); entity.setId(id);
Long cTime = 1425016501000L; long cTime = 1425016501000L;
Long mTime = 1425026901000L; long mTime = 1425026901000L;
entity.setCreatedTime(cTime); entity.setCreatedTime(cTime);
entity.setModifiedTime(mTime); entity.setModifiedTime(mTime);
@ -197,19 +198,16 @@ public class TestHBaseTimelineWriterImpl {
Number val = Number val =
(Number) ApplicationColumn.CREATED_TIME.readResult(result); (Number) ApplicationColumn.CREATED_TIME.readResult(result);
Long cTime1 = val.longValue(); long cTime1 = val.longValue();
assertEquals(cTime1, cTime); assertEquals(cTime1, cTime);
val = (Number) ApplicationColumn.MODIFIED_TIME.readResult(result); val = (Number) ApplicationColumn.MODIFIED_TIME.readResult(result);
Long mTime1 = val.longValue(); long mTime1 = val.longValue();
assertEquals(mTime1, mTime); assertEquals(mTime1, mTime);
Map<String, Object> infoColumns = Map<String, Object> infoColumns =
ApplicationColumnPrefix.INFO.readResults(result); ApplicationColumnPrefix.INFO.readResults(result);
assertEquals(infoMap.size(), infoColumns.size()); assertEquals(infoMap, infoColumns);
for (String infoItem : infoMap.keySet()) {
assertEquals(infoMap.get(infoItem), infoColumns.get(infoItem));
}
// Remember isRelatedTo is of type Map<String, Set<String>> // Remember isRelatedTo is of type Map<String, Set<String>>
for (String isRelatedToKey : isRelatedTo.keySet()) { for (String isRelatedToKey : isRelatedTo.keySet()) {
@ -245,27 +243,15 @@ public class TestHBaseTimelineWriterImpl {
// Configuration // Configuration
Map<String, Object> configColumns = Map<String, Object> configColumns =
ApplicationColumnPrefix.CONFIG.readResults(result); ApplicationColumnPrefix.CONFIG.readResults(result);
assertEquals(conf.size(), configColumns.size()); assertEquals(conf, configColumns);
for (String configItem : conf.keySet()) {
assertEquals(conf.get(configItem), configColumns.get(configItem));
}
NavigableMap<String, NavigableMap<Long, Number>> metricsResult = NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result); ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId()); NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
// We got metrics back assertEquals(metricValues, metricMap);
assertNotNull(metricMap);
// Same number of metrics as we wrote
assertEquals(metricValues.entrySet().size(), metricMap.entrySet().size());
// Iterate over original metrics and confirm that they are present
// here.
for (Entry<Long, Number> metricEntry : metricValues.entrySet()) {
assertEquals(metricEntry.getValue(),
metricMap.get(metricEntry.getKey()));
}
// read the timeline entity using the reader this time
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id, TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id,
entity.getType(), entity.getId(), entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL)); EnumSet.of(TimelineReader.Field.ALL));
@ -274,6 +260,31 @@ public class TestHBaseTimelineWriterImpl {
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1); assertNotNull(e1);
assertEquals(1, es1.size()); assertEquals(1, es1.size());
// verify attributes
assertEquals(id, e1.getId());
assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
e1.getType());
assertEquals(cTime, e1.getCreatedTime());
assertEquals(mTime, e1.getModifiedTime());
Map<String, Object> infoMap2 = e1.getInfo();
assertEquals(infoMap, infoMap2);
Map<String, Set<String>> isRelatedTo2 = e1.getIsRelatedToEntities();
assertEquals(isRelatedTo, isRelatedTo2);
Map<String, Set<String>> relatesTo2 = e1.getRelatesToEntities();
assertEquals(relatesTo, relatesTo2);
Map<String, String> conf2 = e1.getConfigs();
assertEquals(conf, conf2);
Set<TimelineMetric> metrics2 = e1.getMetrics();
assertEquals(metrics, metrics2);
for (TimelineMetric metric2 : metrics2) {
Map<Long, Number> metricValues2 = metric2.getValues();
assertEquals(metricValues, metricValues2);
}
} finally { } finally {
if (hbi != null) { if (hbi != null) {
hbi.stop(); hbi.stop();
@ -294,8 +305,8 @@ public class TestHBaseTimelineWriterImpl {
String type = "world"; String type = "world";
entity.setId(id); entity.setId(id);
entity.setType(type); entity.setType(type);
Long cTime = 1425016501000L; long cTime = 1425016501000L;
Long mTime = 1425026901000L; long mTime = 1425026901000L;
entity.setCreatedTime(cTime); entity.setCreatedTime(cTime);
entity.setModifiedTime(mTime); entity.setModifiedTime(mTime);
@ -396,20 +407,16 @@ public class TestHBaseTimelineWriterImpl {
assertEquals(type, type1); assertEquals(type, type1);
Number val = (Number) EntityColumn.CREATED_TIME.readResult(result); Number val = (Number) EntityColumn.CREATED_TIME.readResult(result);
Long cTime1 = val.longValue(); long cTime1 = val.longValue();
assertEquals(cTime1, cTime); assertEquals(cTime1, cTime);
val = (Number) EntityColumn.MODIFIED_TIME.readResult(result); val = (Number) EntityColumn.MODIFIED_TIME.readResult(result);
Long mTime1 = val.longValue(); long mTime1 = val.longValue();
assertEquals(mTime1, mTime); assertEquals(mTime1, mTime);
Map<String, Object> infoColumns = Map<String, Object> infoColumns =
EntityColumnPrefix.INFO.readResults(result); EntityColumnPrefix.INFO.readResults(result);
assertEquals(infoMap.size(), infoColumns.size()); assertEquals(infoMap, infoColumns);
for (String infoItem : infoMap.keySet()) {
assertEquals(infoMap.get(infoItem),
infoColumns.get(infoItem));
}
// Remember isRelatedTo is of type Map<String, Set<String>> // Remember isRelatedTo is of type Map<String, Set<String>>
for (String isRelatedToKey : isRelatedTo.keySet()) { for (String isRelatedToKey : isRelatedTo.keySet()) {
@ -447,32 +454,19 @@ public class TestHBaseTimelineWriterImpl {
// Configuration // Configuration
Map<String, Object> configColumns = Map<String, Object> configColumns =
EntityColumnPrefix.CONFIG.readResults(result); EntityColumnPrefix.CONFIG.readResults(result);
assertEquals(conf.size(), configColumns.size()); assertEquals(conf, configColumns);
for (String configItem : conf.keySet()) {
assertEquals(conf.get(configItem), configColumns.get(configItem));
}
NavigableMap<String, NavigableMap<Long, Number>> metricsResult = NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
EntityColumnPrefix.METRIC.readResultsWithTimestamps(result); EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId()); NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
// We got metrics back assertEquals(metricValues, metricMap);
assertNotNull(metricMap);
// Same number of metrics as we wrote
assertEquals(metricValues.entrySet().size(), metricMap.entrySet()
.size());
// Iterate over original metrics and confirm that they are present
// here.
for (Entry<Long, Number> metricEntry : metricValues.entrySet()) {
assertEquals(metricEntry.getValue(),
metricMap.get(metricEntry.getKey()));
}
} }
} }
assertEquals(1, rowCount); assertEquals(1, rowCount);
assertEquals(17, colCount); assertEquals(17, colCount);
// read the timeline entity using the reader this time
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
entity.getType(), entity.getId(), entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL)); EnumSet.of(TimelineReader.Field.ALL));
@ -481,6 +475,30 @@ public class TestHBaseTimelineWriterImpl {
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1); assertNotNull(e1);
assertEquals(1, es1.size()); assertEquals(1, es1.size());
// verify attributes
assertEquals(id, e1.getId());
assertEquals(type, e1.getType());
assertEquals(cTime, e1.getCreatedTime());
assertEquals(mTime, e1.getModifiedTime());
Map<String, Object> infoMap2 = e1.getInfo();
assertEquals(infoMap, infoMap2);
Map<String, Set<String>> isRelatedTo2 = e1.getIsRelatedToEntities();
assertEquals(isRelatedTo, isRelatedTo2);
Map<String, Set<String>> relatesTo2 = e1.getRelatesToEntities();
assertEquals(relatesTo, relatesTo2);
Map<String, String> conf2 = e1.getConfigs();
assertEquals(conf, conf2);
Set<TimelineMetric> metrics2 = e1.getMetrics();
assertEquals(metrics, metrics2);
for (TimelineMetric metric2 : metrics2) {
Map<Long, Number> metricValues2 = metric2.getValues();
assertEquals(metricValues, metricValues2);
}
} finally { } finally {
if (hbi != null) { if (hbi != null) {
hbi.stop(); hbi.stop();
@ -494,9 +512,9 @@ public class TestHBaseTimelineWriterImpl {
} }
private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
String flow, Long runid, String appName, TimelineEntity te) { String flow, long runid, String appName, TimelineEntity te) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1); byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
assertTrue(rowKeyComponents.length == 7); assertTrue(rowKeyComponents.length == 7);
assertEquals(user, Bytes.toString(rowKeyComponents[0])); assertEquals(user, Bytes.toString(rowKeyComponents[0]));
@ -511,9 +529,9 @@ public class TestHBaseTimelineWriterImpl {
} }
private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster, private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster,
String user, String flow, Long runid, String appName) { String user, String flow, long runid, String appName) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1); byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
assertTrue(rowKeyComponents.length == 5); assertTrue(rowKeyComponents.length == 5);
assertEquals(cluster, Bytes.toString(rowKeyComponents[0])); assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
@ -530,7 +548,7 @@ public class TestHBaseTimelineWriterImpl {
TimelineEvent event = new TimelineEvent(); TimelineEvent event = new TimelineEvent();
String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE; String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE;
event.setId(eventId); event.setId(eventId);
Long expTs = 1436512802000L; long expTs = 1436512802000L;
event.setTimestamp(expTs); event.setTimestamp(expTs);
String expKey = "foo_event"; String expKey = "foo_event";
Object expVal = "test"; Object expVal = "test";
@ -577,24 +595,25 @@ public class TestHBaseTimelineWriterImpl {
assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid, assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
appName)); appName));
Map<String, Object> eventsResult = Map<?, Object> eventsResult =
ApplicationColumnPrefix.EVENT.readResults(result); ApplicationColumnPrefix.EVENT.
readResultsHavingCompoundColumnQualifiers(result);
// there should be only one event // there should be only one event
assertEquals(1, eventsResult.size()); assertEquals(1, eventsResult.size());
// key name for the event for (Map.Entry<?, Object> e : eventsResult.entrySet()) {
byte[] compoundColumnQualifierBytes = // the qualifier is a compound key
Separator.VALUES.join(Bytes.toBytes(eventId), // hence match individual values
Bytes.toBytes(TimelineWriterUtils.invert(expTs)), byte[][] karr = (byte[][])e.getKey();
Bytes.toBytes(expKey)); assertEquals(3, karr.length);
String valueKey = Bytes.toString(compoundColumnQualifierBytes); assertEquals(eventId, Bytes.toString(karr[0]));
for (Map.Entry<String, Object> e : eventsResult.entrySet()) { assertEquals(TimelineWriterUtils.invert(expTs), Bytes.toLong(karr[1]));
// the value key must match assertEquals(expKey, Bytes.toString(karr[2]));
assertEquals(valueKey, e.getKey());
Object value = e.getValue(); Object value = e.getValue();
// there should be only one timestamp and value // there should be only one timestamp and value
assertEquals(expVal, value.toString()); assertEquals(expVal, value.toString());
} }
// read the timeline entity using the reader this time
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
entity.getType(), entity.getId(), entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL)); EnumSet.of(TimelineReader.Field.ALL));
@ -613,6 +632,21 @@ public class TestHBaseTimelineWriterImpl {
assertEquals(1, es1.size()); assertEquals(1, es1.size());
assertEquals(1, es2.size()); assertEquals(1, es2.size());
assertEquals(es1, es2); assertEquals(es1, es2);
// check the events
NavigableSet<TimelineEvent> events = e1.getEvents();
// there should be only one event
assertEquals(1, events.size());
for (TimelineEvent e : events) {
assertEquals(eventId, e.getId());
assertEquals(expTs, e.getTimestamp());
Map<String,Object> info = e.getInfo();
assertEquals(1, info.size());
for (Map.Entry<String, Object> infoEntry : info.entrySet()) {
assertEquals(expKey, infoEntry.getKey());
assertEquals(expVal, infoEntry.getValue());
}
}
} finally { } finally {
if (hbi != null) { if (hbi != null) {
hbi.stop(); hbi.stop();
@ -630,7 +664,7 @@ public class TestHBaseTimelineWriterImpl {
TimelineEvent event = new TimelineEvent(); TimelineEvent event = new TimelineEvent();
String eventId = "foo_event_id"; String eventId = "foo_event_id";
event.setId(eventId); event.setId(eventId);
Long expTs = 1436512802000L; long expTs = 1436512802000L;
event.setTimestamp(expTs); event.setTimestamp(expTs);
final TimelineEntity entity = new TimelineEntity(); final TimelineEntity entity = new TimelineEntity();
@ -678,22 +712,21 @@ public class TestHBaseTimelineWriterImpl {
assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
entity)); entity));
Map<String, Object> eventsResult = Map<?, Object> eventsResult =
EntityColumnPrefix.EVENT.readResults(result); EntityColumnPrefix.EVENT.
readResultsHavingCompoundColumnQualifiers(result);
// there should be only one event // there should be only one event
assertEquals(1, eventsResult.size()); assertEquals(1, eventsResult.size());
// key name for the event for (Map.Entry<?, Object> e : eventsResult.entrySet()) {
byte[] compoundColumnQualifierWithTsBytes = // the qualifier is a compound key
Separator.VALUES.join(Bytes.toBytes(eventId), // hence match individual values
Bytes.toBytes(TimelineWriterUtils.invert(expTs))); byte[][] karr = (byte[][])e.getKey();
byte[] compoundColumnQualifierBytes = assertEquals(3, karr.length);
Separator.VALUES.join(compoundColumnQualifierWithTsBytes, assertEquals(eventId, Bytes.toString(karr[0]));
null); assertEquals(TimelineWriterUtils.invert(expTs),
String valueKey = Bytes.toString(compoundColumnQualifierBytes); Bytes.toLong(karr[1]));
for (Map.Entry<String, Object> e : // key must be empty
eventsResult.entrySet()) { assertEquals(0, karr[2].length);
// the column qualifier key must match
assertEquals(valueKey, e.getKey());
Object value = e.getValue(); Object value = e.getValue();
// value should be empty // value should be empty
assertEquals("", value.toString()); assertEquals("", value.toString());
@ -702,6 +735,7 @@ public class TestHBaseTimelineWriterImpl {
} }
assertEquals(1, rowCount); assertEquals(1, rowCount);
// read the timeline entity using the reader this time
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
entity.getType(), entity.getId(), entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL)); EnumSet.of(TimelineReader.Field.ALL));
@ -710,6 +744,17 @@ public class TestHBaseTimelineWriterImpl {
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1); assertNotNull(e1);
assertEquals(1, es1.size()); assertEquals(1, es1.size());
// check the events
NavigableSet<TimelineEvent> events = e1.getEvents();
// there should be only one event
assertEquals(1, events.size());
for (TimelineEvent e : events) {
assertEquals(eventId, e.getId());
assertEquals(expTs, e.getTimestamp());
Map<String,Object> info = e.getInfo();
assertTrue(info == null || info.isEmpty());
}
} finally { } finally {
hbi.stop(); hbi.stop();
hbi.close(); hbi.close();