YARN-3901. Populate flow run data in the flow_run & flow activity tables (Vrushali C via sjlee)

This commit is contained in:
Sangjin Lee 2015-09-17 10:34:52 -07:00
parent ee081018e4
commit a68e383921
31 changed files with 3545 additions and 91 deletions

View File

@ -180,6 +180,19 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<additionnalDependencies>
<additionnalDependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</additionnalDependency>
</additionnalDependencies>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -33,11 +33,10 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
@ -53,23 +52,36 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
/**
* This implements a hbase based backend for storing application timeline entity
* This implements a hbase based backend for storing the timeline entity
* information.
* It writes to multiple tables at the backend
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class HBaseTimelineWriterImpl extends AbstractService implements
TimelineWriter {
private static final Log LOG = LogFactory
.getLog(HBaseTimelineWriterImpl.class);
private Connection conn;
private TypedBufferedMutator<EntityTable> entityTable;
private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
private TypedBufferedMutator<ApplicationTable> applicationTable;
private static final Log LOG = LogFactory
.getLog(HBaseTimelineWriterImpl.class);
private TypedBufferedMutator<FlowActivityTable> flowActivityTable;
private TypedBufferedMutator<FlowRunTable> flowRunTable;
public HBaseTimelineWriterImpl() {
super(HBaseTimelineWriterImpl.class.getName());
@ -91,6 +103,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn);
flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn);
flowActivityTable = new FlowActivityTable().getTableMutator(hbaseConf, conn);
}
/**
@ -111,7 +125,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
// if the entity is the application, the destination is the application
// table
boolean isApplication = isApplicationEntity(te);
boolean isApplication = TimelineWriterUtils.isApplicationEntity(te);
byte[] rowKey = isApplication ?
ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
appId) :
@ -124,37 +138,139 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
storeMetrics(rowKey, te.getMetrics(), isApplication);
storeRelations(rowKey, te, isApplication);
if (isApplicationCreated(te)) {
onApplicationCreated(
clusterId, userId, flowName, flowVersion, flowRunId, appId, te);
if (isApplication) {
if (TimelineWriterUtils.isApplicationCreated(te)) {
onApplicationCreated(clusterId, userId, flowName, flowVersion,
flowRunId, appId, te);
}
// if it's an application entity, store metrics
storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId,
appId, te);
// if application has finished, store it's finish time and write final
// values
// of all metrics
if (TimelineWriterUtils.isApplicationFinished(te)) {
onApplicationFinished(clusterId, userId, flowName, flowVersion,
flowRunId, appId, te);
}
}
}
return putStatus;
}
private static boolean isApplicationEntity(TimelineEntity te) {
return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
}
private static boolean isApplicationCreated(TimelineEntity te) {
if (isApplicationEntity(te)) {
for (TimelineEvent event : te.getEvents()) {
if (event.getId().equals(
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
return true;
}
}
}
return false;
}
private void onApplicationCreated(String clusterId, String userId,
String flowName, String flowVersion, long flowRunId, String appId,
TimelineEntity te) throws IOException {
// store in App to flow table
storeInAppToFlowTable(clusterId, userId, flowName, flowVersion, flowRunId,
appId, te);
// store in flow run table
storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion,
flowRunId, appId, te);
// store in flow activity table
storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
flowRunId, appId, te);
}
/*
* updates the {@link FlowActivityTable} with the Application TimelineEntity
* information
*/
private void storeInFlowActivityTable(String clusterId, String userId,
String flowName, String flowVersion, long flowRunId, String appId,
TimelineEntity te) throws IOException {
byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, userId, flowName);
byte[] qualifier = GenericObjectMapper.write(flowRunId);
FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
null, flowVersion,
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
}
/*
* updates the {@link FlowRunTable} with Application Created information
*/
private void storeAppCreatedInFlowRunTable(String clusterId, String userId,
String flowName, String flowVersion, long flowRunId, String appId,
TimelineEntity te) throws IOException {
byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
flowRunId);
FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null,
te.getCreatedTime(),
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
}
private void storeInAppToFlowTable(String clusterId, String userId,
String flowName, String flowVersion, long flowRunId, String appId,
TimelineEntity te) throws IOException {
byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
AppToFlowColumn.FLOW_RUN_ID.store(
rowKey, appToFlowTable, null, flowRunId);
AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId);
}
/*
* updates the {@link FlowRunTable} and {@link FlowActivityTable} when an
* application has finished
*/
private void onApplicationFinished(String clusterId, String userId,
String flowName, String flowVersion, long flowRunId, String appId,
TimelineEntity te) throws IOException {
// store in flow run table
storeAppFinishedInFlowRunTable(clusterId, userId, flowName, flowRunId,
appId, te);
// indicate in the flow activity table that the app has finished
storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
flowRunId, appId, te);
}
/*
* Update the {@link FlowRunTable} with Application Finished information
*/
private void storeAppFinishedInFlowRunTable(String clusterId, String userId,
String flowName, long flowRunId, String appId, TimelineEntity te)
throws IOException {
byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
flowRunId);
Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID
.getAttribute(appId);
FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
TimelineWriterUtils.getApplicationFinishedTime(te), attributeAppId);
// store the final value of metrics since application has finished
Set<TimelineMetric> metrics = te.getMetrics();
if (metrics != null) {
storeFlowMetrics(rowKey, metrics, attributeAppId,
AggregationOperation.SUM_FINAL.getAttribute());
}
}
/*
* Updates the {@link FlowRunTable} with Application Metrics
*/
private void storeFlowMetricsAppRunning(String clusterId, String userId,
String flowName, long flowRunId, String appId, TimelineEntity te)
throws IOException {
Set<TimelineMetric> metrics = te.getMetrics();
if (metrics != null) {
byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
flowRunId);
storeFlowMetrics(rowKey, metrics,
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
}
}
private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
Attribute... attributes) throws IOException {
for (TimelineMetric metric : metrics) {
String metricColumnQualifier = metric.getId();
Map<Long, Number> timeseries = metric.getValues();
for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
Long timestamp = timeseriesEntry.getKey();
FlowRunColumnPrefix.METRIC.store(rowKey, flowRunTable,
metricColumnQualifier, timestamp, timeseriesEntry.getValue(),
attributes);
}
}
}
private void storeRelations(byte[] rowKey, TimelineEntity te,
@ -184,7 +300,6 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
// id3?id4?id5
String compoundValue =
Separator.VALUES.joinEncoded(connectedEntity.getValue());
columnPrefix.store(rowKey, table, connectedEntity.getKey(), null,
compoundValue);
}
@ -342,6 +457,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
entityTable.flush();
appToFlowTable.flush();
applicationTable.flush();
flowRunTable.flush();
flowActivityTable.flush();
}
/**
@ -364,6 +481,16 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
LOG.info("closing the application table");
applicationTable.close();
}
if (flowRunTable != null) {
LOG.info("closing the flow run table");
// The close API performs flushing and releases any resources held
flowRunTable.close();
}
if (flowActivityTable != null) {
LOG.info("closing the flowActivityTable table");
// The close API performs flushing and releases any resources held
flowActivityTable.close();
}
if (conn != null) {
LOG.info("closing the hbase Connection");
conn.close();

View File

@ -42,6 +42,8 @@ import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
/**
* This creates the schema for a hbase based backend for storing application
@ -199,7 +201,7 @@ public class TimelineSchemaCreator {
return commandLine;
}
private static void createAllTables(Configuration hbaseConf,
public static void createAllTables(Configuration hbaseConf,
boolean skipExisting) throws IOException {
Connection conn = null;
@ -236,6 +238,24 @@ public class TimelineSchemaCreator {
throw e;
}
}
try {
new FlowRunTable().createTable(admin, hbaseConf);
} catch (IOException e) {
if (skipExisting) {
LOG.warn("Skip and continue on: " + e.getMessage());
} else {
throw e;
}
}
try {
new FlowActivityTable().createTable(admin, hbaseConf);
} catch (IOException e) {
if (skipExisting) {
LOG.warn("Skip and continue on: " + e.getMessage());
} else {
throw e;
}
}
} finally {
if (conn != null) {
conn.close();

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Identifies fully qualified columns for the {@link ApplicationTable}.
@ -76,9 +77,9 @@ public enum ApplicationColumn implements Column<ApplicationTable> {
public void store(byte[] rowKey,
TypedBufferedMutator<ApplicationTable> tableMutator, Long timestamp,
Object inputValue) throws IOException {
Object inputValue, Attribute... attributes) throws IOException {
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
inputValue);
inputValue, attributes);
}
public Object readResult(Result result) throws IOException {

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Identifies partially qualified columns for the application table.
@ -112,7 +113,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
*/
public void store(byte[] rowKey,
TypedBufferedMutator<ApplicationTable> tableMutator, byte[] qualifier,
Long timestamp, Object inputValue) throws IOException {
Long timestamp, Object inputValue, Attribute... attributes)
throws IOException {
// Null check
if (qualifier == null) {
@ -123,8 +125,9 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
}
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);
}
/*
* (non-Javadoc)
@ -137,7 +140,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
*/
public void store(byte[] rowKey,
TypedBufferedMutator<ApplicationTable> tableMutator, String qualifier,
Long timestamp, Object inputValue) throws IOException {
Long timestamp, Object inputValue, Attribute...attributes)
throws IOException {
// Null check
if (qualifier == null) {
@ -148,7 +152,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);
}
/*

View File

@ -25,8 +25,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
import java.io.IOException;
import java.util.Map;
/**
* Identifies fully qualified columns for the {@link AppToFlowTable}.
@ -67,9 +69,9 @@ public enum AppToFlowColumn implements Column<AppToFlowTable> {
public void store(byte[] rowKey,
TypedBufferedMutator<AppToFlowTable> tableMutator, Long timestamp,
Object inputValue) throws IOException {
Object inputValue, Attribute... attributes) throws IOException {
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
inputValue);
inputValue, attributes);
}
public Object readResult(Result result) throws IOException {

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* A Column represents the way to store a fully qualified column in a specific
@ -38,12 +39,15 @@ public interface Column<T> {
* column.
* @param timestamp version timestamp. When null the server timestamp will be
* used.
* @param attributes Map of attributes for this mutation. used in the coprocessor
* to set/read the cell tags. Can be null.
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @throws IOException
*/
public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
Long timestamp, Object inputValue) throws IOException;
Long timestamp, Object inputValue, Attribute... attributes)
throws IOException;
/**
* Get the latest version of this specified column. Note: this call clones the

View File

@ -31,7 +31,8 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* This class is meant to be used only by explicit Columns, and not directly to
* write by clients.
@ -58,33 +59,68 @@ public class ColumnHelper<T> {
* Sends a Mutation to the table. The mutations will be buffered and sent over
* the wire as part of a batch.
*
* @param rowKey identifying the row to write. Nothing gets written when null.
* @param tableMutator used to modify the underlying HBase table
* @param columnQualifier column qualifier. Nothing gets written when null.
* @param timestamp version timestamp. When null the server timestamp will be
* used.
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @param rowKey
* identifying the row to write. Nothing gets written when null.
* @param tableMutator
* used to modify the underlying HBase table
* @param columnQualifier
* column qualifier. Nothing gets written when null.
* @param timestamp
* version timestamp. When null the current timestamp multiplied with
* TimestampGenerator.TS_MULTIPLIER and added with last 3 digits of
* app id will be used
* @param inputValue
* the value to write to the rowKey and column qualifier. Nothing
* gets written when null.
* @throws IOException
*/
public void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
byte[] columnQualifier, Long timestamp, Object inputValue)
throws IOException {
byte[] columnQualifier, Long timestamp, Object inputValue,
Attribute... attributes) throws IOException {
if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) {
return;
}
Put p = new Put(rowKey);
if (timestamp == null) {
p.addColumn(columnFamilyBytes, columnQualifier,
GenericObjectMapper.write(inputValue));
} else {
p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
GenericObjectMapper.write(inputValue));
timestamp = getPutTimestamp(timestamp, attributes);
p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
GenericObjectMapper.write(inputValue));
if ((attributes != null) && (attributes.length > 0)) {
for (Attribute attribute : attributes) {
p.setAttribute(attribute.getName(), attribute.getValue());
}
}
tableMutator.mutate(p);
}
/*
* Figures out the cell timestamp used in the Put For storing into flow run
* table. We would like to left shift the timestamp and supplement it with the
* AppId id so that there are no collisions in the flow run table's cells
*/
private long getPutTimestamp(Long timestamp, Attribute[] attributes) {
if (timestamp == null) {
timestamp = System.currentTimeMillis();
}
String appId = getAppIdFromAttributes(attributes);
long supplementedTS = TimestampGenerator.getSupplementedTimestamp(
timestamp, appId);
return supplementedTS;
}
private String getAppIdFromAttributes(Attribute[] attributes) {
if (attributes == null) {
return null;
}
String appId = null;
for (Attribute attribute : attributes) {
if (AggregationCompactionDimension.APPLICATION_ID.toString().equals(
attribute.getName())) {
appId = Bytes.toString(attribute.getValue());
}
}
return appId;
}
/**
* @return the column family for this column implementation.
*/
@ -171,7 +207,9 @@ public class ColumnHelper<T> {
for (Entry<Long, byte[]> cell : cells.entrySet()) {
V value =
(V) GenericObjectMapper.read(cell.getValue());
cellResults.put(cell.getKey(), value);
cellResults.put(
TimestampGenerator.getTruncatedTimestamp(cell.getKey()),
value);
}
}
results.put(columnName, cellResults);
@ -312,6 +350,27 @@ public class ColumnHelper<T> {
return columnQualifier;
}
/**
* @param columnPrefixBytes The byte representation for the column prefix.
* Should not contain {@link Separator#QUALIFIERS}.
* @param qualifier for the remainder of the column.
* @return fully sanitized column qualifier that is a combination of prefix
* and qualifier. If prefix is null, the result is simply the encoded
* qualifier without any separator.
*/
public static byte[] getColumnQualifier(byte[] columnPrefixBytes,
long qualifier) {
if (columnPrefixBytes == null) {
return Bytes.toBytes(qualifier);
}
// Convert qualifier to lower case, strip of separators and tag on column
// prefix.
byte[] columnQualifier =
Separator.QUALIFIERS.join(columnPrefixBytes, Bytes.toBytes(qualifier));
return columnQualifier;
}
/**
* @param columnPrefixBytes The byte representation for the column prefix.
* Should not contain {@link Separator#QUALIFIERS}.

View File

@ -23,6 +23,7 @@ import java.util.NavigableMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Used to represent a partially qualified column, where the actual column name
@ -43,12 +44,36 @@ public interface ColumnPrefix<T> {
* @param qualifier column qualifier. Nothing gets written when null.
* @param timestamp version timestamp. When null the server timestamp will be
* used.
*@param attributes attributes for the mutation that are used by the coprocessor
* to set/read the cell tags
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @throws IOException
*/
public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
String qualifier, Long timestamp, Object inputValue) throws IOException;
byte[] qualifier, Long timestamp, Object inputValue,
Attribute... attributes) throws IOException;
/**
* Sends a Mutation to the table. The mutations will be buffered and sent over
* the wire as part of a batch.
*
* @param rowKey identifying the row to write. Nothing gets written when null.
* @param tableMutator used to modify the underlying HBase table. Caller is
* responsible to pass a mutator for the table that actually has this
* column.
* @param qualifier column qualifier. Nothing gets written when null.
* @param timestamp version timestamp. When null the server timestamp will be
* used.
*@param attributes attributes for the mutation that are used by the coprocessor
* to set/read the cell tags
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @throws IOException
*/
public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
String qualifier, Long timestamp, Object inputValue,
Attribute... attributes) throws IOException;
/**
* Get the latest version of this specified column. Note: this call clones the
@ -81,4 +106,5 @@ public interface ColumnPrefix<T> {
*/
public <V> NavigableMap<String, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result) throws IOException;
}

View File

@ -19,9 +19,19 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* bunch of utility functions used across TimelineWriter classes
@ -36,6 +46,9 @@ public class TimelineWriterUtils {
/** indicator for no limits for splitting */
public static final int NO_LIMIT_SPLIT = -1;
/** milliseconds in one day */
public static final long MILLIS_ONE_DAY = 86400000L;
/**
* Splits the source array into multiple array segments using the given
* separator, up to a maximum of count items. This will naturally produce
@ -140,4 +153,176 @@ public class TimelineWriterUtils {
return Long.MAX_VALUE - key;
}
/**
* returns the timestamp of that day's start (which is midnight 00:00:00 AM)
* for a given input timestamp
*
* @param ts
* @return timestamp of that day's beginning (midnight)
*/
public static long getTopOfTheDayTimestamp(long ts) {
long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
return dayTimestamp;
}
/**
* Combines the input array of attributes and the input aggregation operation
* into a new array of attributes.
*
* @param attributes
* @param aggOp
* @return array of combined attributes
*/
public static Attribute[] combineAttributes(Attribute[] attributes,
AggregationOperation aggOp) {
int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
Attribute[] combinedAttributes = new Attribute[newLength];
if (attributes != null) {
System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
}
if (aggOp != null) {
Attribute a2 = aggOp.getAttribute();
combinedAttributes[newLength - 1] = a2;
}
return combinedAttributes;
}
/**
* Returns a number for the new array size. The new array is the combination
* of input array of attributes and the input aggregation operation.
*
* @param attributes
* @param aggOp
* @return the size for the new array
*/
private static int getNewLengthCombinedAttributes(Attribute[] attributes,
AggregationOperation aggOp) {
int oldLength = getAttributesLength(attributes);
int aggLength = getAppOpLength(aggOp);
return oldLength + aggLength;
}
private static int getAppOpLength(AggregationOperation aggOp) {
if (aggOp != null) {
return 1;
}
return 0;
}
private static int getAttributesLength(Attribute[] attributes) {
if (attributes != null) {
return attributes.length;
}
return 0;
}
/**
* checks if an application has finished
*
* @param te
* @return true if application has finished else false
*/
public static boolean isApplicationFinished(TimelineEntity te) {
SortedSet<TimelineEvent> allEvents = te.getEvents();
if ((allEvents != null) && (allEvents.size() > 0)) {
TimelineEvent event = allEvents.last();
if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
return true;
}
}
return false;
}
/**
* get the time at which an app finished
*
* @param te
* @return true if application has finished else false
*/
public static long getApplicationFinishedTime(TimelineEntity te) {
SortedSet<TimelineEvent> allEvents = te.getEvents();
if ((allEvents != null) && (allEvents.size() > 0)) {
TimelineEvent event = allEvents.last();
if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
return event.getTimestamp();
}
}
return 0l;
}
/**
* Checks if the input TimelineEntity object is an ApplicationEntity.
*
* @param te
* @return true if input is an ApplicationEntity, false otherwise
*/
public static boolean isApplicationEntity(TimelineEntity te) {
return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
}
/**
* Checks for the APPLICATION_CREATED event.
*
* @param te
* @return true is application event exists, false otherwise
*/
public static boolean isApplicationCreated(TimelineEntity te) {
if (isApplicationEntity(te)) {
for (TimelineEvent event : te.getEvents()) {
if (event.getId()
.equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
return true;
}
}
}
return false;
}
/**
* Returns the first seen aggregation operation as seen in the list of input
* tags or null otherwise
*
* @param tags
* @return AggregationOperation
*/
public static AggregationOperation getAggregationOperationFromTagsList(
List<Tag> tags) {
for (AggregationOperation aggOp : AggregationOperation.values()) {
for (Tag tag : tags) {
if (tag.getType() == aggOp.getTagType()) {
return aggOp;
}
}
}
return null;
}
/**
* Creates a {@link Tag} from the input attribute.
*
* @param attribute
* @return Tag
*/
public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
// attribute could be either an Aggregation Operation or
// an Aggregation Dimension
// Get the Tag type from either
AggregationOperation aggOp = AggregationOperation
.getAggregationOperation(attribute.getKey());
if (aggOp != null) {
Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
return t;
}
AggregationCompactionDimension aggCompactDim = AggregationCompactionDimension
.getAggregationCompactionDimension(attribute.getKey());
if (aggCompactDim != null) {
Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
return t;
}
return null;
}
}

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* Utility class that allows HBase coprocessors to interact with unique
* timestamps.
*/
public class TimestampGenerator {
/*
* if this is changed, then reading cell timestamps written with older
* multiplier value will not work
*/
public static final long TS_MULTIPLIER = 1000L;
private final AtomicLong lastTimestamp = new AtomicLong();
/**
* Returns the current wall clock time in milliseconds, multiplied by the
* required precision.
*/
public long currentTime() {
// We want to align cell timestamps with current time.
// cell timestamps are not be less than
// System.currentTimeMillis() * TS_MULTIPLIER.
return System.currentTimeMillis() * TS_MULTIPLIER;
}
/**
* Returns a timestamp value unique within the scope of this
* {@code TimestampGenerator} instance. For usage by HBase
* {@code RegionObserver} coprocessors, this normally means unique within a
* given region.
*
* Unlikely scenario of generating a non-unique timestamp: if there is a
* sustained rate of more than 1M hbase writes per second AND if region fails
* over within that time range of timestamps being generated then there may be
* collisions writing to a cell version of the same column.
*/
public long getUniqueTimestamp() {
long lastTs;
long nextTs;
do {
lastTs = lastTimestamp.get();
nextTs = Math.max(lastTs + 1, currentTime());
} while (!lastTimestamp.compareAndSet(lastTs, nextTs));
return nextTs;
}
/**
* returns a timestamp multiplied with TS_MULTIPLIER and last few digits of
* application id
*
* Unlikely scenario of generating a timestamp that is a duplicate: If more
* than a 1000 concurrent apps are running in one flow run AND write to same
* column at the same time, then say appId of 1001 will overlap with appId of
* 001 and there may be collisions for that flow run's specific column.
*
* @param incomingTS
* @param appId
* @return a timestamp multiplied with TS_MULTIPLIER and last few digits of
* application id
*/
public static long getSupplementedTimestamp(long incomingTS, String appId) {
long suffix = getAppIdSuffix(appId);
long outgoingTS = incomingTS * TS_MULTIPLIER + suffix;
return outgoingTS;
}
private static long getAppIdSuffix(String appIdStr) {
if (appIdStr == null) {
return 0L;
}
ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
long id = appId.getId() % TS_MULTIPLIER;
return id;
}
/**
* truncates the last few digits of the timestamp which were supplemented by
* the TimestampGenerator#getSupplementedTimestamp function
*
* @param incomingTS
* @return a truncated timestamp value
*/
public static long getTruncatedTimestamp(long incomingTS) {
return incomingTS / TS_MULTIPLIER;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Identifies fully qualified columns for the {@link EntityTable}.
@ -81,9 +83,9 @@ public enum EntityColumn implements Column<EntityTable> {
public void store(byte[] rowKey,
TypedBufferedMutator<EntityTable> tableMutator, Long timestamp,
Object inputValue) throws IOException {
Object inputValue, Attribute... attributes) throws IOException {
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
inputValue);
inputValue, attributes);
}
public Object readResult(Result result) throws IOException {

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Identifies partially qualified columns for the entity table.
@ -108,11 +109,13 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #store(byte[],
* org.apache.hadoop.yarn.server.timelineservice.storage.common.
* TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
* TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object,
* org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
*/
public void store(byte[] rowKey,
TypedBufferedMutator<EntityTable> tableMutator, String qualifier,
Long timestamp, Object inputValue) throws IOException {
Long timestamp, Object inputValue, Attribute... attributes)
throws IOException {
// Null check
if (qualifier == null) {
@ -123,8 +126,9 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
}
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);
}
/*
* (non-Javadoc)
@ -137,7 +141,8 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
*/
public void store(byte[] rowKey,
TypedBufferedMutator<EntityTable> tableMutator, byte[] qualifier,
Long timestamp, Object inputValue) throws IOException {
Long timestamp, Object inputValue, Attribute... attributes)
throws IOException {
// Null check
if (qualifier == null) {
@ -148,8 +153,9 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
}
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);
}
/*
* (non-Javadoc)

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Identifies the compaction dimensions for the data in the {@link FlowRunTable}
* .
*/
public enum AggregationCompactionDimension {
/**
* the application id
*/
APPLICATION_ID((byte) 101);
private byte tagType;
private byte[] inBytes;
private AggregationCompactionDimension(byte tagType) {
this.tagType = tagType;
this.inBytes = Bytes.toBytes(this.name());
}
public Attribute getAttribute(String attributeValue) {
return new Attribute(this.name(), Bytes.toBytes(attributeValue));
}
public byte getTagType() {
return tagType;
}
public byte[] getInBytes() {
return this.inBytes.clone();
}
public static AggregationCompactionDimension getAggregationCompactionDimension(
String aggCompactDimStr) {
for (AggregationCompactionDimension aggDim : AggregationCompactionDimension
.values()) {
if (aggDim.name().equals(aggCompactDimStr)) {
return aggDim;
}
}
return null;
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Identifies the attributes to be set for puts into the {@link FlowRunTable}.
* The numbers used for tagType are prime numbers
*/
public enum AggregationOperation {
/**
* When the flow was started.
*/
MIN((byte) 71),
/**
* When it ended.
*/
MAX((byte) 73),
/**
* The metrics of the flow
*/
SUM((byte) 79),
/**
* application running
*/
SUM_FINAL((byte) 83),
/**
* compact
*/
COMPACT((byte) 89);
private byte tagType;
private byte[] inBytes;
private AggregationOperation(byte tagType) {
this.tagType = tagType;
this.inBytes = Bytes.toBytes(this.name());
}
public Attribute getAttribute() {
return new Attribute(this.name(), this.inBytes);
}
public byte getTagType() {
return tagType;
}
public byte[] getInBytes() {
return this.inBytes.clone();
}
/**
* returns the AggregationOperation enum that represents that string
* @param aggOpStr
* @return the AggregationOperation enum that represents that string
*/
public static AggregationOperation getAggregationOperation(String aggOpStr) {
for (AggregationOperation aggOp : AggregationOperation.values()) {
if (aggOp.name().equals(aggOpStr)) {
return aggOp;
}
}
return null;
}
}

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -15,10 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
/**
* Defines the attribute tuple to be set for puts into the {@link FlowRunTable}.
*/
public class Attribute {
private final String name;
private final byte[] value;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
public Attribute(String name, byte[] value) {
this.name = name;
this.value = value.clone();
}
public String getName() {
return name;
}
public byte[] getValue() {
return value.clone();
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
/**
* Represents the flow run table column families.
*/
public enum FlowActivityColumnFamily implements ColumnFamily<FlowActivityTable> {
/**
* Info column family houses known columns, specifically ones included in
* columnfamily filters.
*/
INFO("i");
/**
* Byte representation of this column family.
*/
private final byte[] bytes;
/**
* @param value
* create a column family with this name. Must be lower case and
* without spaces.
*/
private FlowActivityColumnFamily(String value) {
// column families should be lower case and not contain any spaces.
this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
}
public byte[] getBytes() {
return Bytes.copy(bytes);
}
}

View File

@ -0,0 +1,243 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
/**
* Identifies partially qualified columns for the {@link FlowActivityTable}
*/
public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable> {
/**
* To store run ids of the flows
*/
RUN_ID(FlowActivityColumnFamily.INFO, "r", null);
private final ColumnHelper<FlowActivityTable> column;
private final ColumnFamily<FlowActivityTable> columnFamily;
/**
* Can be null for those cases where the provided column qualifier is the
* entire column name.
*/
private final String columnPrefix;
private final byte[] columnPrefixBytes;
private final AggregationOperation aggOp;
/**
* Private constructor, meant to be used by the enum definition.
*
* @param columnFamily
* that this column is stored in.
* @param columnPrefix
* for this column.
*/
private FlowActivityColumnPrefix(
ColumnFamily<FlowActivityTable> columnFamily, String columnPrefix,
AggregationOperation aggOp) {
column = new ColumnHelper<FlowActivityTable>(columnFamily);
this.columnFamily = columnFamily;
this.columnPrefix = columnPrefix;
if (columnPrefix == null) {
this.columnPrefixBytes = null;
} else {
// Future-proof by ensuring the right column prefix hygiene.
this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE
.encode(columnPrefix));
}
this.aggOp = aggOp;
}
/**
* @return the column name value
*/
public String getColumnPrefix() {
return columnPrefix;
}
public byte[] getColumnPrefixBytes() {
return columnPrefixBytes.clone();
}
public AggregationOperation getAttribute() {
return aggOp;
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #store(byte[],
* org.apache.hadoop.yarn.server.timelineservice.storage.common.
* TypedBufferedMutator, byte[], java.lang.Long, java.lang.Object,
* org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
*/
@Override
public void store(byte[] rowKey,
TypedBufferedMutator<FlowActivityTable> tableMutator, byte[] qualifier,
Long timestamp, Object inputValue, Attribute... attributes)
throws IOException {
// Null check
if (qualifier == null) {
throw new IOException("Cannot store column with null qualifier in "
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier = ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifier);
Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
*/
public Object readResult(Result result, String qualifier) throws IOException {
byte[] columnQualifier = ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifier);
return column.readResult(result, columnQualifier);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResults(org.apache.hadoop.hbase.client.Result)
*/
public Map<String, Object> readResults(Result result) throws IOException {
return column.readResults(result, columnPrefixBytes);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
*/
public <T> NavigableMap<String, NavigableMap<Long, T>> readResultsWithTimestamps(
Result result) throws IOException {
return column.readResultsWithTimestamps(result, columnPrefixBytes);
}
/**
* Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there
* is no match. The following holds true: {@code columnFor(x) == columnFor(y)}
* if and only if {@code x.equals(y)} or {@code (x == y == null)}
*
* @param columnPrefix
* Name of the column to retrieve
* @return the corresponding {@link FlowActivityColumnPrefix} or null
*/
public static final FlowActivityColumnPrefix columnFor(String columnPrefix) {
// Match column based on value, assume column family matches.
for (FlowActivityColumnPrefix flowActivityColPrefix : FlowActivityColumnPrefix
.values()) {
// Find a match based only on name.
if (flowActivityColPrefix.getColumnPrefix().equals(columnPrefix)) {
return flowActivityColPrefix;
}
}
// Default to null
return null;
}
/**
* Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there
* is no match. The following holds true:
* {@code columnFor(a,x) == columnFor(b,y)} if and only if
* {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
*
* @param columnFamily
* The columnFamily for which to retrieve the column.
* @param columnPrefix
* Name of the column to retrieve
* @return the corresponding {@link FlowActivityColumnPrefix} or null if both
* arguments don't match.
*/
public static final FlowActivityColumnPrefix columnFor(
FlowActivityColumnFamily columnFamily, String columnPrefix) {
// TODO: needs unit test to confirm and need to update javadoc to explain
// null prefix case.
for (FlowActivityColumnPrefix flowActivityColumnPrefix : FlowActivityColumnPrefix
.values()) {
// Find a match based column family and on name.
if (flowActivityColumnPrefix.columnFamily.equals(columnFamily)
&& (((columnPrefix == null) && (flowActivityColumnPrefix
.getColumnPrefix() == null)) || (flowActivityColumnPrefix
.getColumnPrefix().equals(columnPrefix)))) {
return flowActivityColumnPrefix;
}
}
// Default to null
return null;
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #store(byte[],
* org.apache.hadoop.yarn.server.timelineservice.storage.common.
* TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object,
* org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
*/
@Override
public void store(byte[] rowKey,
TypedBufferedMutator<FlowActivityTable> tableMutator, String qualifier,
Long timestamp, Object inputValue, Attribute... attributes)
throws IOException {
// Null check
if (qualifier == null) {
throw new IOException("Cannot store column with null qualifier in "
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier = ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifier);
Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
combinedAttributes);
}
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
/**
* Represents a rowkey for the flow activity table.
*/
public class FlowActivityRowKey {
private final String clusterId;
private final long dayTs;
private final String userId;
private final String flowId;
public FlowActivityRowKey(String clusterId, long dayTs, String userId,
String flowId) {
this.clusterId = clusterId;
this.dayTs = dayTs;
this.userId = userId;
this.flowId = flowId;
}
public String getClusterId() {
return clusterId;
}
public long getDayTimestamp() {
return dayTs;
}
public String getUserId() {
return userId;
}
public String getFlowId() {
return flowId;
}
/**
* Constructs a row key for the flow activity table as follows:
* {@code clusterId!dayTimestamp!user!flowId}
*
* Will insert into current day's record in the table
* @param clusterId
* @param userId
* @param flowId
* @return byte array with the row key prefix
*/
public static byte[] getRowKey(String clusterId, String userId, String flowId) {
long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
.currentTimeMillis());
return getRowKey(clusterId, dayTs, userId, flowId);
}
/**
* Constructs a row key for the flow activity table as follows:
* {@code clusterId!dayTimestamp!user!flowId}
*
* @param clusterId
* @param dayTs
* @param userId
* @param flowId
* @return byte array for the row key
*/
public static byte[] getRowKey(String clusterId, long dayTs, String userId,
String flowId) {
return Separator.QUALIFIERS.join(
Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
Bytes.toBytes(TimelineWriterUtils.invert(dayTs)),
Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
Bytes.toBytes(Separator.QUALIFIERS.encode(flowId)));
}
/**
* Given the raw row key as bytes, returns the row key as an object.
*/
public static FlowActivityRowKey parseRowKey(byte[] rowKey) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
if (rowKeyComponents.length < 4) {
throw new IllegalArgumentException("the row key is not valid for "
+ "a flow activity");
}
String clusterId = Separator.QUALIFIERS.decode(Bytes
.toString(rowKeyComponents[0]));
long dayTs = TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[1]));
String userId = Separator.QUALIFIERS.decode(Bytes
.toString(rowKeyComponents[2]));
String flowId = Separator.QUALIFIERS.decode(Bytes
.toString(rowKeyComponents[3]));
return new FlowActivityRowKey(clusterId, dayTs, userId, flowId);
}
}

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
/**
* The flow activity table has column family info
* Stores the daily activity record for flows
* Useful as a quick lookup of what flows were
* running on a given day
*
* Example flow activity table record:
*
* </pre>
* |-------------------------------------------|
* | Row key | Column Family |
* | | info |
* |-------------------------------------------|
* | clusterId! | r!runid1:version1 |
* | inv Top of | |
* | Day! | r!runid2:version7 |
* | userName! | |
* | flowId | |
* |-------------------------------------------|
* </pre>
*/
public class FlowActivityTable extends BaseTable<FlowActivityTable> {
/** flow activity table prefix */
private static final String PREFIX =
YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowactivity";
/** config param name that specifies the flowactivity table name */
public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
/** default value for flowactivity table name */
public static final String DEFAULT_TABLE_NAME = "timelineservice.flowactivity";
private static final Log LOG = LogFactory.getLog(FlowActivityTable.class);
/** default max number of versions */
public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
public FlowActivityTable() {
super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
* (org.apache.hadoop.hbase.client.Admin,
* org.apache.hadoop.conf.Configuration)
*/
public void createTable(Admin admin, Configuration hbaseConf)
throws IOException {
TableName table = getTableName(hbaseConf);
if (admin.tableExists(table)) {
// do not disable / delete existing table
// similar to the approach taken by map-reduce jobs when
// output directory exists
throw new IOException("Table " + table.getNameAsString()
+ " already exists.");
}
HTableDescriptor FlowActivityTableDescp = new HTableDescriptor(table);
HColumnDescriptor infoCF =
new HColumnDescriptor(FlowActivityColumnFamily.INFO.getBytes());
infoCF.setBloomFilterType(BloomType.ROWCOL);
FlowActivityTableDescp.addFamily(infoCF);
infoCF.setMinVersions(1);
infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
// TODO: figure the split policy before running in production
admin.createTable(FlowActivityTableDescp);
LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ admin.tableExists(table));
}
}

View File

@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
/**
* Identifies fully qualified columns for the {@link FlowRunTable}.
*/
public enum FlowRunColumn implements Column<FlowRunTable> {
/**
* When the flow was started. This is the minimum of currently known
* application start times.
*/
MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time",
AggregationOperation.MIN),
/**
* When the flow ended. This is the maximum of currently known application end
* times.
*/
MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time",
AggregationOperation.MAX),
/**
* The version of the flow that this flow belongs to.
*/
FLOW_VERSION(FlowRunColumnFamily.INFO, "flow_version", null);
private final ColumnHelper<FlowRunTable> column;
private final ColumnFamily<FlowRunTable> columnFamily;
private final String columnQualifier;
private final byte[] columnQualifierBytes;
private final AggregationOperation aggOp;
private FlowRunColumn(ColumnFamily<FlowRunTable> columnFamily,
String columnQualifier, AggregationOperation aggOp) {
this.columnFamily = columnFamily;
this.columnQualifier = columnQualifier;
this.aggOp = aggOp;
// Future-proof by ensuring the right column prefix hygiene.
this.columnQualifierBytes = Bytes.toBytes(Separator.SPACE
.encode(columnQualifier));
this.column = new ColumnHelper<FlowRunTable>(columnFamily);
}
/**
* @return the column name value
*/
private String getColumnQualifier() {
return columnQualifier;
}
public byte[] getColumnQualifierBytes() {
return columnQualifierBytes.clone();
}
public AggregationOperation getAggregationOperation() {
return aggOp;
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.Column#store
* (byte[], org.apache.hadoop.yarn.server.timelineservice.storage.common.
* TypedBufferedMutator, java.lang.Long, java.lang.Object,
* org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
*/
public void store(byte[] rowKey,
TypedBufferedMutator<FlowRunTable> tableMutator, Long timestamp,
Object inputValue, Attribute... attributes) throws IOException {
Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
attributes, aggOp);
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
inputValue, combinedAttributes);
}
public Object readResult(Result result) throws IOException {
return column.readResult(result, columnQualifierBytes);
}
/**
* Retrieve an {@link FlowRunColumn} given a name, or null if there is no
* match. The following holds true: {@code columnFor(x) == columnFor(y)} if
* and only if {@code x.equals(y)} or {@code (x == y == null)}
*
* @param columnQualifier
* Name of the column to retrieve
* @return the corresponding {@link FlowRunColumn} or null
*/
public static final FlowRunColumn columnFor(String columnQualifier) {
// Match column based on value, assume column family matches.
for (FlowRunColumn ec : FlowRunColumn.values()) {
// Find a match based only on name.
if (ec.getColumnQualifier().equals(columnQualifier)) {
return ec;
}
}
// Default to null
return null;
}
/**
* Retrieve an {@link FlowRunColumn} given a name, or null if there is no
* match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
* if and only if {@code a.equals(b) & x.equals(y)} or
* {@code (x == y == null)}
*
* @param columnFamily
* The columnFamily for which to retrieve the column.
* @param name
* Name of the column to retrieve
* @return the corresponding {@link FlowRunColumn} or null if both arguments
* don't match.
*/
public static final FlowRunColumn columnFor(FlowRunColumnFamily columnFamily,
String name) {
for (FlowRunColumn ec : FlowRunColumn.values()) {
// Find a match based column family and on name.
if (ec.columnFamily.equals(columnFamily)
&& ec.getColumnQualifier().equals(name)) {
return ec;
}
}
// Default to null
return null;
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
/**
* Represents the flow run table column families.
*/
public enum FlowRunColumnFamily implements ColumnFamily<FlowRunTable> {
/**
* Info column family houses known columns, specifically ones included in
* columnfamily filters.
*/
INFO("i");
/**
* Byte representation of this column family.
*/
private final byte[] bytes;
/**
* @param value
* create a column family with this name. Must be lower case and
* without spaces.
*/
private FlowRunColumnFamily(String value) {
// column families should be lower case and not contain any spaces.
this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
}
public byte[] getBytes() {
return Bytes.copy(bytes);
}
}

View File

@ -0,0 +1,239 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
/**
* Identifies partially qualified columns for the {@link FlowRunTable}.
*/
public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
/**
* To store flow run info values.
*/
METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM);
private final ColumnHelper<FlowRunTable> column;
private final ColumnFamily<FlowRunTable> columnFamily;
/**
* Can be null for those cases where the provided column qualifier is the
* entire column name.
*/
private final String columnPrefix;
private final byte[] columnPrefixBytes;
private final AggregationOperation aggOp;
/**
* Private constructor, meant to be used by the enum definition.
*
* @param columnFamily
* that this column is stored in.
* @param columnPrefix
* for this column.
*/
private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
String columnPrefix, AggregationOperation fra) {
column = new ColumnHelper<FlowRunTable>(columnFamily);
this.columnFamily = columnFamily;
this.columnPrefix = columnPrefix;
if (columnPrefix == null) {
this.columnPrefixBytes = null;
} else {
// Future-proof by ensuring the right column prefix hygiene.
this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE
.encode(columnPrefix));
}
this.aggOp = fra;
}
/**
* @return the column name value
*/
public String getColumnPrefix() {
return columnPrefix;
}
public byte[] getColumnPrefixBytes() {
return columnPrefixBytes.clone();
}
public AggregationOperation getAttribute() {
return aggOp;
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #store(byte[],
* org.apache.hadoop.yarn.server.timelineservice.storage.common.
* TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
*/
public void store(byte[] rowKey,
TypedBufferedMutator<FlowRunTable> tableMutator, String qualifier,
Long timestamp, Object inputValue, Attribute... attributes)
throws IOException {
// Null check
if (qualifier == null) {
throw new IOException("Cannot store column with null qualifier in "
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier = ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifier);
Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #store(byte[],
* org.apache.hadoop.yarn.server.timelineservice.storage.common.
* TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
*/
public void store(byte[] rowKey,
TypedBufferedMutator<FlowRunTable> tableMutator, byte[] qualifier,
Long timestamp, Object inputValue, Attribute... attributes)
throws IOException {
// Null check
if (qualifier == null) {
throw new IOException("Cannot store column with null qualifier in "
+ tableMutator.getName().getNameAsString());
}
byte[] columnQualifier = ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifier);
Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
combinedAttributes);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
*/
public Object readResult(Result result, String qualifier) throws IOException {
byte[] columnQualifier = ColumnHelper.getColumnQualifier(
this.columnPrefixBytes, qualifier);
return column.readResult(result, columnQualifier);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResults(org.apache.hadoop.hbase.client.Result)
*/
public Map<String, Object> readResults(Result result) throws IOException {
return column.readResults(result, columnPrefixBytes);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
*/
public <T> NavigableMap<String, NavigableMap<Long, T>> readResultsWithTimestamps(
Result result) throws IOException {
return column.readResultsWithTimestamps(result, columnPrefixBytes);
}
/**
* Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is
* no match. The following holds true: {@code columnFor(x) == columnFor(y)} if
* and only if {@code x.equals(y)} or {@code (x == y == null)}
*
* @param columnPrefix
* Name of the column to retrieve
* @return the corresponding {@link FlowRunColumnPrefix} or null
*/
public static final FlowRunColumnPrefix columnFor(String columnPrefix) {
// Match column based on value, assume column family matches.
for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) {
// Find a match based only on name.
if (frcp.getColumnPrefix().equals(columnPrefix)) {
return frcp;
}
}
// Default to null
return null;
}
/**
* Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is
* no match. The following holds true:
* {@code columnFor(a,x) == columnFor(b,y)} if and only if
* {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
*
* @param columnFamily
* The columnFamily for which to retrieve the column.
* @param columnPrefix
* Name of the column to retrieve
* @return the corresponding {@link FlowRunColumnPrefix} or null if both
* arguments don't match.
*/
public static final FlowRunColumnPrefix columnFor(
FlowRunColumnFamily columnFamily, String columnPrefix) {
// TODO: needs unit test to confirm and need to update javadoc to explain
// null prefix case.
for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) {
// Find a match based column family and on name.
if (frcp.columnFamily.equals(columnFamily)
&& (((columnPrefix == null) && (frcp.getColumnPrefix() == null)) || (frcp
.getColumnPrefix().equals(columnPrefix)))) {
return frcp;
}
}
// Default to null
return null;
}
}

View File

@ -0,0 +1,210 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
public class FlowRunCoprocessor extends BaseRegionObserver {
@SuppressWarnings("unused")
private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
private HRegion region;
/**
* generate a timestamp that is unique per row in a region this is per region
*/
private final TimestampGenerator timestampGenerator = new TimestampGenerator();
@Override
public void start(CoprocessorEnvironment e) throws IOException {
if (e instanceof RegionCoprocessorEnvironment) {
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
this.region = env.getRegion();
}
}
/*
* (non-Javadoc)
*
* This method adds the tags onto the cells in the Put. It is presumed that
* all the cells in one Put have the same set of Tags. The existing cell
* timestamp is overwritten for non-metric cells and each such cell gets a new
* unique timestamp generated by {@link TimestampGenerator}
*
* @see
* org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache
* .hadoop.hbase.coprocessor.ObserverContext,
* org.apache.hadoop.hbase.client.Put,
* org.apache.hadoop.hbase.regionserver.wal.WALEdit,
* org.apache.hadoop.hbase.client.Durability)
*/
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,
WALEdit edit, Durability durability) throws IOException {
Map<String, byte[]> attributes = put.getAttributesMap();
// Assumption is that all the cells in a put are the same operation.
List<Tag> tags = new ArrayList<>();
if ((attributes != null) && (attributes.size() > 0)) {
for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
Tag t = TimelineWriterUtils.getTagFromAttribute(attribute);
tags.add(t);
}
byte[] tagByteArray = Tag.fromList(tags);
NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>(
Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap()
.entrySet()) {
List<Cell> newCells = new ArrayList<>(entry.getValue().size());
for (Cell cell : entry.getValue()) {
// for each cell in the put add the tags
// Assumption is that all the cells in
// one put are the same operation
// also, get a unique cell timestamp for non-metric cells
// this way we don't inadvertently overwrite cell versions
long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags);
newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell),
CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell),
tagByteArray));
}
newFamilyMap.put(entry.getKey(), newCells);
} // for each entry
// Update the family map for the Put
put.setFamilyCellMap(newFamilyMap);
}
}
/**
* Determines if the current cell's timestamp is to be used or a new unique
* cell timestamp is to be used. The reason this is done is to inadvertently
* overwrite cells when writes come in very fast. But for metric cells, the
* cell timestamp signifies the metric timestamp. Hence we don't want to
* overwrite it.
*
* @param timestamp
* @param tags
* @return cell timestamp
*/
private long getCellTimestamp(long timestamp, List<Tag> tags) {
// if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default)
// then use the generator
if (timestamp == HConstants.LATEST_TIMESTAMP) {
return timestampGenerator.getUniqueTimestamp();
} else {
return timestamp;
}
}
/*
* (non-Javadoc)
*
* Creates a {@link FlowScanner} Scan so that it can correctly process the
* contents of {@link FlowRunTable}.
*
* @see
* org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache
* .hadoop.hbase.coprocessor.ObserverContext,
* org.apache.hadoop.hbase.client.Get, java.util.List)
*/
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
Get get, List<Cell> results) throws IOException {
Scan scan = new Scan(get);
scan.setMaxVersions();
RegionScanner scanner = null;
try {
scanner = new FlowScanner(region, scan.getBatch(),
region.getScanner(scan));
scanner.next(results);
e.bypass();
} finally {
if (scanner != null) {
scanner.close();
}
}
}
/*
* (non-Javadoc)
*
* Ensures that max versions are set for the Scan so that metrics can be
* correctly aggregated and min/max can be correctly determined.
*
* @see
* org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org
* .apache.hadoop.hbase.coprocessor.ObserverContext,
* org.apache.hadoop.hbase.client.Scan,
* org.apache.hadoop.hbase.regionserver.RegionScanner)
*/
@Override
public RegionScanner preScannerOpen(
ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
RegionScanner s) throws IOException {
// set max versions for scan to see all
// versions to aggregate for metrics
scan.setMaxVersions();
return s;
}
/*
* (non-Javadoc)
*
* Creates a {@link FlowScanner} Scan so that it can correctly process the
* contents of {@link FlowRunTable}.
*
* @see
* org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen(
* org.apache.hadoop.hbase.coprocessor.ObserverContext,
* org.apache.hadoop.hbase.client.Scan,
* org.apache.hadoop.hbase.regionserver.RegionScanner)
*/
@Override
public RegionScanner postScannerOpen(
ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
RegionScanner scanner) throws IOException {
return new FlowScanner(region, scan.getBatch(), scanner);
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
/**
* Represents a rowkey for the flow run table.
*/
public class FlowRunRowKey {
// TODO: more methods are needed for this class like parse row key
/**
* Constructs a row key for the entity table as follows: {
* clusterId!userI!flowId!Inverted Flow Run Id}
*
* @param clusterId
* @param userId
* @param flowId
* @param flowRunId
* @return byte array with the row key
*/
public static byte[] getRowKey(String clusterId, String userId,
String flowId, Long flowRunId) {
byte[] first = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId,
userId, flowId));
// Note that flowRunId is a long, so we can't encode them all at the same
// time.
byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
return Separator.QUALIFIERS.join(first, second);
}
}

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
/**
* The flow run table has column family info
* Stores per flow run information
* aggregated across applications.
*
* Metrics are also stored in the info column family.
*
* Example flow run table record:
*
* <pre>
* flow_run table
* |-------------------------------------------|
* | Row key | Column Family |
* | | info |
* |-------------------------------------------|
* | clusterId! | flow_version:version7 |
* | userName! | |
* | flowId! | running_apps:1 |
* | flowRunId | |
* | | min_start_time:1392995080000 |
* | | #0:"" |
* | | |
* | | min_start_time:1392995081012 |
* | | #0:appId2 |
* | | |
* | | min_start_time:1392993083210 |
* | | #0:appId3 |
* | | |
* | | |
* | | max_end_time:1392993084018 |
* | | #0:"" |
* | | |
* | | |
* | | m!mapInputRecords:127 |
* | | #0:"" |
* | | |
* | | m!mapInputRecords:31 |
* | | #2:appId2 |
* | | |
* | | m!mapInputRecords:37 |
* | | #1:appId3 |
* | | |
* | | |
* | | m!mapOutputRecords:181 |
* | | #0:"" |
* | | |
* | | m!mapOutputRecords:37 |
* | | #1:appId3 |
* | | |
* | | |
* |-------------------------------------------|
* </pre>
*/
public class FlowRunTable extends BaseTable<FlowRunTable> {
/** entity prefix */
private static final String PREFIX =
YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowrun";
/** config param name that specifies the flowrun table name */
public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
/** default value for flowrun table name */
public static final String DEFAULT_TABLE_NAME = "timelineservice.flowrun";
private static final Log LOG = LogFactory.getLog(FlowRunTable.class);
/** default max number of versions */
public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
public FlowRunTable() {
super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
* (org.apache.hadoop.hbase.client.Admin,
* org.apache.hadoop.conf.Configuration)
*/
public void createTable(Admin admin, Configuration hbaseConf)
throws IOException {
TableName table = getTableName(hbaseConf);
if (admin.tableExists(table)) {
// do not disable / delete existing table
// similar to the approach taken by map-reduce jobs when
// output directory exists
throw new IOException("Table " + table.getNameAsString()
+ " already exists.");
}
HTableDescriptor flowRunTableDescp = new HTableDescriptor(table);
HColumnDescriptor infoCF =
new HColumnDescriptor(FlowRunColumnFamily.INFO.getBytes());
infoCF.setBloomFilterType(BloomType.ROWCOL);
flowRunTableDescp.addFamily(infoCF);
infoCF.setMinVersions(1);
infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
// TODO: figure the split policy
flowRunTableDescp.addCoprocessor(FlowRunCoprocessor.class
.getCanonicalName());
admin.createTable(flowRunTableDescp);
LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ admin.tableExists(table));
}
}

View File

@ -0,0 +1,486 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
/**
* Invoked via the coprocessor when a Get or a Scan is issued for flow run
* table. Looks through the list of cells per row, checks their tags and does
* operation on those cells as per the cell tags. Transforms reads of the stored
* metrics into calculated sums for each column Also, finds the min and max for
* start and end times in a flow run.
*/
class FlowScanner implements RegionScanner, Closeable {
private static final Log LOG = LogFactory.getLog(FlowScanner.class);
private final HRegion region;
private final InternalScanner flowRunScanner;
private RegionScanner regionScanner;
private final int limit;
private boolean hasMore;
private byte[] currentRow;
private List<Cell> availableCells = new ArrayList<>();
private int currentIndex;
FlowScanner(HRegion region, int limit, InternalScanner internalScanner) {
this.region = region;
this.limit = limit;
this.flowRunScanner = internalScanner;
if (internalScanner instanceof RegionScanner) {
this.regionScanner = (RegionScanner) internalScanner;
}
// TODO: note if it's compaction/flush
}
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo()
*/
@Override
public HRegionInfo getRegionInfo() {
return region.getRegionInfo();
}
@Override
public boolean nextRaw(List<Cell> cells) throws IOException {
return nextRaw(cells, limit);
}
@Override
public boolean nextRaw(List<Cell> cells, int limit) throws IOException {
return nextInternal(cells, limit);
}
@Override
public boolean next(List<Cell> cells) throws IOException {
return next(cells, limit);
}
@Override
public boolean next(List<Cell> cells, int limit) throws IOException {
return nextInternal(cells, limit);
}
private String getAggregationCompactionDimension(List<Tag> tags) {
String appId = null;
for (Tag t : tags) {
if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
.getType()) {
appId = Bytes.toString(t.getValue());
}
}
return appId;
}
/**
* This method loops through the cells in a given row of the
* {@link FlowRunTable}. It looks at the tags of each cell to figure out how
* to process the contents. It then calculates the sum or min or max for each
* column or returns the cell as is.
*
* @param cells
* @param limit
* @return true if next row is available for the scanner, false otherwise
* @throws IOException
*/
private boolean nextInternal(List<Cell> cells, int limit) throws IOException {
Cell cell = null;
startNext();
// Loop through all the cells in this row
// For min/max/metrics we do need to scan the entire set of cells to get the
// right one
// But with flush/compaction, the number of cells being scanned will go down
// cells are grouped per column qualifier then sorted by cell timestamp
// (latest to oldest) per column qualifier
// So all cells in one qualifier come one after the other before we see the
// next column qualifier
ByteArrayComparator comp = new ByteArrayComparator();
byte[] currentColumnQualifier = TimelineWriterUtils.EMPTY_BYTES;
AggregationOperation currentAggOp = null;
SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
Set<String> alreadySeenAggDim = new HashSet<>();
int addedCnt = 0;
while (((cell = peekAtNextCell(limit)) != null)
&& (limit <= 0 || addedCnt < limit)) {
byte[] newColumnQualifier = CellUtil.cloneQualifier(cell);
if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
addedCnt += emitCells(cells, currentColumnCells, currentAggOp);
resetState(currentColumnCells, alreadySeenAggDim);
currentColumnQualifier = newColumnQualifier;
currentAggOp = getCurrentAggOp(cell);
}
collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim);
nextCell(limit);
}
if (!currentColumnCells.isEmpty()) {
emitCells(cells, currentColumnCells, currentAggOp);
}
return hasMore();
}
private AggregationOperation getCurrentAggOp(Cell cell) {
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
// We assume that all the operations for a particular column are the same
return TimelineWriterUtils.getAggregationOperationFromTagsList(tags);
}
/**
* resets the parameters to an intialized state for next loop iteration
*
* @param cell
* @param currentAggOp
* @param currentColumnCells
* @param alreadySeenAggDim
* @param collectedButNotEmitted
*/
private void resetState(SortedSet<Cell> currentColumnCells,
Set<String> alreadySeenAggDim) {
currentColumnCells.clear();
alreadySeenAggDim.clear();
}
private void collectCells(SortedSet<Cell> currentColumnCells,
AggregationOperation currentAggOp, Cell cell,
Set<String> alreadySeenAggDim) throws IOException {
if (currentAggOp == null) {
// not a min/max/metric cell, so just return it as is
currentColumnCells.add(cell);
nextCell(limit);
return;
}
switch (currentAggOp) {
case MIN:
if (currentColumnCells.size() == 0) {
currentColumnCells.add(cell);
} else {
Cell currentMinCell = currentColumnCells.first();
Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp);
if (!currentMinCell.equals(newMinCell)) {
currentColumnCells.remove(currentMinCell);
currentColumnCells.add(newMinCell);
}
}
break;
case MAX:
if (currentColumnCells.size() == 0) {
currentColumnCells.add(cell);
} else {
Cell currentMaxCell = currentColumnCells.first();
Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp);
if (!currentMaxCell.equals(newMaxCell)) {
currentColumnCells.remove(currentMaxCell);
currentColumnCells.add(newMaxCell);
}
}
break;
case SUM:
case SUM_FINAL:
// only if this app has not been seen yet, add to current column cells
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
String aggDim = getAggregationCompactionDimension(tags);
if (alreadySeenAggDim.contains(aggDim)) {
// if this agg dimension has already been seen,
// since they show up in sorted order
// we drop the rest which are older
// in other words, this cell is older than previously seen cells
// for that agg dim
} else {
// not seen this agg dim, hence consider this cell in our working set
currentColumnCells.add(cell);
alreadySeenAggDim.add(aggDim);
}
break;
default:
break;
} // end of switch case
}
/*
* Processes the cells in input param currentColumnCells and populates
* List<Cell> cells as the output based on the input AggregationOperation
* parameter.
*/
private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
AggregationOperation currentAggOp) throws IOException {
if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
return 0;
}
if (currentAggOp == null) {
cells.addAll(currentColumnCells);
return currentColumnCells.size();
}
switch (currentAggOp) {
case MIN:
case MAX:
cells.addAll(currentColumnCells);
return currentColumnCells.size();
case SUM:
case SUM_FINAL:
Cell sumCell = processSummation(currentColumnCells);
cells.add(sumCell);
return 1;
default:
cells.addAll(currentColumnCells);
return currentColumnCells.size();
}
}
/*
* Returns a cell whose value is the sum of all cell values in the input set.
* The new cell created has the timestamp of the most recent metric cell. The
* sum of a metric for a flow run is the summation at the point of the last
* metric update in that flow till that time.
*/
private Cell processSummation(SortedSet<Cell> currentColumnCells)
throws IOException {
Number sum = 0;
Number currentValue = 0;
long ts = 0L;
long mostCurrentTimestamp = 0l;
Cell mostRecentCell = null;
for (Cell cell : currentColumnCells) {
currentValue = (Number) GenericObjectMapper.read(CellUtil
.cloneValue(cell));
ts = cell.getTimestamp();
if (mostCurrentTimestamp < ts) {
mostCurrentTimestamp = ts;
mostRecentCell = cell;
}
sum = sum.longValue() + currentValue.longValue();
}
Cell sumCell = createNewCell(mostRecentCell, sum);
return sumCell;
}
/**
* Determines which cell is to be returned based on the values in each cell
* and the comparison operation MIN or MAX.
*
* @param previouslyChosenCell
* @param currentCell
* @param currentAggOp
* @return the cell which is the min (or max) cell
* @throws IOException
*/
private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
AggregationOperation currentAggOp) throws IOException {
if (previouslyChosenCell == null) {
return currentCell;
}
try {
long previouslyChosenCellValue = ((Number) GenericObjectMapper
.read(CellUtil.cloneValue(previouslyChosenCell))).longValue();
long currentCellValue = ((Number) GenericObjectMapper.read(CellUtil
.cloneValue(currentCell))).longValue();
switch (currentAggOp) {
case MIN:
if (currentCellValue < previouslyChosenCellValue) {
// new value is minimum, hence return this cell
return currentCell;
} else {
// previously chosen value is miniumum, hence return previous min cell
return previouslyChosenCell;
}
case MAX:
if (currentCellValue > previouslyChosenCellValue) {
// new value is max, hence return this cell
return currentCell;
} else {
// previously chosen value is max, hence return previous max cell
return previouslyChosenCell;
}
default:
return currentCell;
}
} catch (IllegalArgumentException iae) {
LOG.error("caught iae during conversion to long ", iae);
return currentCell;
}
}
private Cell createNewCell(Cell origCell, Number number) throws IOException {
byte[] newValue = GenericObjectMapper.write(number);
return CellUtil.createCell(CellUtil.cloneRow(origCell),
CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
}
@Override
public void close() throws IOException {
flowRunScanner.close();
}
/**
* Called to signal the start of the next() call by the scanner.
*/
public void startNext() {
currentRow = null;
}
/**
* Returns whether or not the underlying scanner has more rows.
*/
public boolean hasMore() {
return currentIndex < availableCells.size() ? true : hasMore;
}
/**
* Returns the next available cell for the current row and advances the
* pointer to the next cell. This method can be called multiple times in a row
* to advance through all the available cells.
*
* @param limit
* the limit of number of cells to return if the next batch must be
* fetched by the wrapped scanner
* @return the next available cell or null if no more cells are available for
* the current row
* @throws IOException
*/
public Cell nextCell(int limit) throws IOException {
Cell cell = peekAtNextCell(limit);
if (cell != null) {
currentIndex++;
}
return cell;
}
/**
* Returns the next available cell for the current row, without advancing the
* pointer. Calling this method multiple times in a row will continue to
* return the same cell.
*
* @param limit
* the limit of number of cells to return if the next batch must be
* fetched by the wrapped scanner
* @return the next available cell or null if no more cells are available for
* the current row
* @throws IOException
*/
public Cell peekAtNextCell(int limit) throws IOException {
if (currentIndex >= availableCells.size()) {
// done with current batch
availableCells.clear();
currentIndex = 0;
hasMore = flowRunScanner.next(availableCells, limit);
}
Cell cell = null;
if (currentIndex < availableCells.size()) {
cell = availableCells.get(currentIndex);
if (currentRow == null) {
currentRow = CellUtil.cloneRow(cell);
} else if (!CellUtil.matchingRow(cell, currentRow)) {
// moved on to the next row
// don't use the current cell
// also signal no more cells for this row
return null;
}
}
return cell;
}
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize()
*/
@Override
public long getMaxResultSize() {
if (regionScanner == null) {
throw new IllegalStateException(
"RegionScanner.isFilterDone() called when the flow "
+ "scanner's scanner is not a RegionScanner");
}
return regionScanner.getMaxResultSize();
}
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint()
*/
@Override
public long getMvccReadPoint() {
if (regionScanner == null) {
throw new IllegalStateException(
"RegionScanner.isFilterDone() called when the flow "
+ "scanner's internal scanner is not a RegionScanner");
}
return regionScanner.getMvccReadPoint();
}
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone()
*/
@Override
public boolean isFilterDone() throws IOException {
if (regionScanner == null) {
throw new IllegalStateException(
"RegionScanner.isFilterDone() called when the flow "
+ "scanner's internal scanner is not a RegionScanner");
}
return regionScanner.isFilterDone();
}
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[])
*/
@Override
public boolean reseek(byte[] bytes) throws IOException {
if (regionScanner == null) {
throw new IllegalStateException(
"RegionScanner.reseek() called when the flow "
+ "scanner's internal scanner is not a RegionScanner");
}
return regionScanner.reseek(bytes);
}
}

View File

@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
@ -88,20 +87,15 @@ public class TestHBaseTimelineStorage {
}
private static void createSchema() throws IOException {
new EntityTable()
.createTable(util.getHBaseAdmin(), util.getConfiguration());
new AppToFlowTable()
.createTable(util.getHBaseAdmin(), util.getConfiguration());
new ApplicationTable()
.createTable(util.getHBaseAdmin(), util.getConfiguration());
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
}
@Test
public void testWriteApplicationToHBase() throws Exception {
TimelineEntities te = new TimelineEntities();
ApplicationEntity entity = new ApplicationEntity();
String id = "hello";
entity.setId(id);
String appId = "application_1000178881110_2002";
entity.setId(appId);
long cTime = 1425016501000L;
long mTime = 1425026901000L;
entity.setCreatedTime(cTime);
@ -173,12 +167,12 @@ public class TestHBaseTimelineStorage {
String flow = "some_flow_name";
String flowVersion = "AB7822C10F1111";
long runid = 1002345678919L;
hbi.write(cluster, user, flow, flowVersion, runid, id, te);
hbi.write(cluster, user, flow, flowVersion, runid, appId, te);
hbi.stop();
// retrieve the row
byte[] rowKey =
ApplicationRowKey.getRowKey(cluster, user, flow, runid, id);
ApplicationRowKey.getRowKey(cluster, user, flow, runid, appId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
Connection conn = ConnectionFactory.createConnection(c1);
@ -190,11 +184,11 @@ public class TestHBaseTimelineStorage {
// check the row key
byte[] row1 = result.getRow();
assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
id));
appId));
// check info column family
String id1 = ApplicationColumn.ID.readResult(result).toString();
assertEquals(id, id1);
assertEquals(appId, id1);
Number val =
(Number) ApplicationColumn.CREATED_TIME.readResult(result);
@ -252,17 +246,17 @@ public class TestHBaseTimelineStorage {
assertEquals(metricValues, metricMap);
// read the timeline entity using the reader this time
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id,
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId,
entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL));
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
id, entity.getType(), null, null, null, null, null, null, null,
appId, entity.getType(), null, null, null, null, null, null, null,
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
assertEquals(1, es1.size());
// verify attributes
assertEquals(id, e1.getId());
assertEquals(appId, e1.getId());
assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
e1.getType());
assertEquals(cTime, e1.getCreatedTime());
@ -576,7 +570,7 @@ public class TestHBaseTimelineStorage {
String flow = "other_flow_name";
String flowVersion = "1111F01C2287BA";
long runid = 1009876543218L;
String appName = "some app name";
String appName = "application_123465899910_1001";
hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
hbi.stop();

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
/**
* Generates the data/entities for the FlowRun and FlowActivity Tables
*/
class TestFlowDataGenerator {
private final static String metric1 = "MAP_SLOT_MILLIS";
private final static String metric2 = "HDFS_BYTES_READ";
static TimelineEntity getEntityMetricsApp1() {
TimelineEntity entity = new TimelineEntity();
String id = "flowRunMetrics_test";
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
Long cTime = 1425016501000L;
entity.setCreatedTime(cTime);
// add metrics
Set<TimelineMetric> metrics = new HashSet<>();
TimelineMetric m1 = new TimelineMetric();
m1.setId(metric1);
Map<Long, Number> metricValues = new HashMap<Long, Number>();
long ts = System.currentTimeMillis();
metricValues.put(ts - 100000, 2);
metricValues.put(ts - 80000, 40);
m1.setType(Type.TIME_SERIES);
m1.setValues(metricValues);
metrics.add(m1);
TimelineMetric m2 = new TimelineMetric();
m2.setId(metric2);
metricValues = new HashMap<Long, Number>();
ts = System.currentTimeMillis();
metricValues.put(ts - 100000, 31);
metricValues.put(ts - 80000, 57);
m2.setType(Type.TIME_SERIES);
m2.setValues(metricValues);
metrics.add(m2);
entity.addMetrics(metrics);
return entity;
}
static TimelineEntity getEntityMetricsApp2() {
TimelineEntity entity = new TimelineEntity();
String id = "flowRunMetrics_test";
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
Long cTime = 1425016501000L;
entity.setCreatedTime(cTime);
// add metrics
Set<TimelineMetric> metrics = new HashSet<>();
TimelineMetric m1 = new TimelineMetric();
m1.setId(metric1);
Map<Long, Number> metricValues = new HashMap<Long, Number>();
long ts = System.currentTimeMillis();
metricValues.put(ts - 100000, 5L);
metricValues.put(ts - 80000, 101L);
m1.setType(Type.TIME_SERIES);
m1.setValues(metricValues);
metrics.add(m1);
entity.addMetrics(metrics);
return entity;
}
static TimelineEntity getEntity1() {
TimelineEntity entity = new TimelineEntity();
String id = "flowRunHello";
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
Long cTime = 20000000000000L;
Long mTime = 1425026901000L;
entity.setCreatedTime(cTime);
entity.setModifiedTime(mTime);
// add metrics
Set<TimelineMetric> metrics = new HashSet<>();
TimelineMetric m1 = new TimelineMetric();
m1.setId(metric1);
Map<Long, Number> metricValues = new HashMap<Long, Number>();
long ts = System.currentTimeMillis();
metricValues.put(ts - 120000, 100000000);
metricValues.put(ts - 100000, 200000000);
metricValues.put(ts - 80000, 300000000);
metricValues.put(ts - 60000, 400000000);
metricValues.put(ts - 40000, 50000000000L);
metricValues.put(ts - 20000, 60000000000L);
m1.setType(Type.TIME_SERIES);
m1.setValues(metricValues);
metrics.add(m1);
entity.addMetrics(metrics);
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
Long expTs = 1436512802000L;
event.setTimestamp(expTs);
String expKey = "foo_event";
Object expVal = "test";
event.addInfo(expKey, expVal);
entity.addEvent(event);
event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
event.setTimestamp(1436512801000L);
event.addInfo(expKey, expVal);
entity.addEvent(event);
return entity;
}
static TimelineEntity getEntityGreaterStartTime() {
TimelineEntity entity = new TimelineEntity();
entity.setCreatedTime(30000000000000L);
entity.setId("flowRunHello with greater start time");
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setType(type);
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
long endTs = 1439379885000L;
event.setTimestamp(endTs);
String expKey = "foo_event_greater";
String expVal = "test_app_greater";
event.addInfo(expKey, expVal);
entity.addEvent(event);
return entity;
}
static TimelineEntity getEntityMaxEndTime(long endTs) {
TimelineEntity entity = new TimelineEntity();
entity.setId("flowRunHello Max End time");
entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
event.setTimestamp(endTs);
String expKey = "foo_even_max_ finished";
String expVal = "test_app_max_finished";
event.addInfo(expKey, expVal);
entity.addEvent(event);
return entity;
}
static TimelineEntity getEntityMinStartTime() {
TimelineEntity entity = new TimelineEntity();
String id = "flowRunHelloMInStartTime";
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
Long cTime = 10000000000000L;
entity.setCreatedTime(cTime);
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
event.setTimestamp(System.currentTimeMillis());
entity.addEvent(event);
return entity;
}
static TimelineEntity getFlowApp1() {
TimelineEntity entity = new TimelineEntity();
String id = "flowActivity_test";
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
Long cTime = 1425016501000L;
entity.setCreatedTime(cTime);
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
Long expTs = 1436512802000L;
event.setTimestamp(expTs);
String expKey = "foo_event";
Object expVal = "test";
event.addInfo(expKey, expVal);
entity.addEvent(event);
return entity;
}
}

View File

@ -0,0 +1,372 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests the FlowRun and FlowActivity Tables
*/
public class TestHBaseStorageFlowActivity {
private static HBaseTestingUtility util;
@BeforeClass
public static void setupBeforeClass() throws Exception {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
conf.setInt("hfile.format.version", 3);
util.startMiniCluster();
createSchema();
}
private static void createSchema() throws IOException {
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
}
/**
* Writes 4 timeline entities belonging to one flow run through the
* {@link HBaseTimelineWriterImpl}
*
* Checks the flow run table contents
*
* The first entity has a created event, metrics and a finish event.
*
* The second entity has a created event and this is the entity with smallest
* start time. This should be the start time for the flow run.
*
* The third entity has a finish event and this is the entity with the max end
* time. This should be the end time for the flow run.
*
* The fourth entity has a created event which has a start time that is
* greater than min start time.
*
* The test also checks in the flow activity table that one entry has been
* made for all of these 4 application entities since they belong to the same
* flow run.
*/
@Test
public void testWriteFlowRunMinMax() throws Exception {
TimelineEntities te = new TimelineEntities();
te.addEntity(TestFlowDataGenerator.getEntity1());
HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
String user = "testWriteFlowRunMinMaxToHBase_user1";
String flow = "testing_flowRun_flow_name";
String flowVersion = "CF7022C10F1354";
Long runid = 1002345678919L;
String appName = "application_100000000000_1111";
long endTs = 1439750690000L;
TimelineEntity entityMinStartTime = TestFlowDataGenerator
.getEntityMinStartTime();
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another entity with the right min start time
te = new TimelineEntities();
te.addEntity(entityMinStartTime);
appName = "application_100000000000_3333";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// writer another entity for max end time
TimelineEntity entityMaxEndTime = TestFlowDataGenerator
.getEntityMaxEndTime(endTs);
te = new TimelineEntities();
te.addEntity(entityMaxEndTime);
appName = "application_100000000000_4444";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// writer another entity with greater start time
TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
.getEntityGreaterStartTime();
te = new TimelineEntities();
te.addEntity(entityGreaterStartTime);
appName = "application_1000000000000000_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// flush everything to hbase
hbi.flush();
} finally {
hbi.close();
}
Connection conn = ConnectionFactory.createConnection(c1);
// check in flow activity table
Table table1 = conn.getTable(TableName
.valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
Get g = new Get(startRow);
Result r1 = table1.get(g);
assertNotNull(r1);
assertTrue(!r1.isEmpty());
Map<byte[], byte[]> values = r1.getFamilyMap(FlowActivityColumnFamily.INFO
.getBytes());
assertEquals(1, values.size());
byte[] row = r1.getRow();
FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey.parseRowKey(row);
assertNotNull(flowActivityRowKey);
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowId());
long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
.currentTimeMillis());
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
assertEquals(1, values.size());
checkFlowActivityRunId(runid, flowVersion, values);
}
/**
* Write 1 application entity and checks the record for today in the flow
* activity table
*/
@Test
public void testWriteFlowActivityOneFlow() throws Exception {
String cluster = "testWriteFlowActivityOneFlow_cluster1";
String user = "testWriteFlowActivityOneFlow_user1";
String flow = "flow_activity_test_flow_name";
String flowVersion = "A122110F135BC4";
Long runid = 1001111178919L;
TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
String appName = "application_1111999999_1234";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
hbi.flush();
} finally {
hbi.close();
}
// check flow activity
checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1);
}
private void checkFlowActivityTable(String cluster, String user, String flow,
String flowVersion, Long runid, Configuration c1) throws IOException {
Scan s = new Scan();
s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
s.setStartRow(startRow);
String clusterStop = cluster + "1";
byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow);
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName
.valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s);
int rowCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
Map<byte[], byte[]> values = result
.getFamilyMap(FlowActivityColumnFamily.INFO.getBytes());
rowCount++;
byte[] row = result.getRow();
FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey
.parseRowKey(row);
assertNotNull(flowActivityRowKey);
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowId());
long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
.currentTimeMillis());
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
assertEquals(1, values.size());
checkFlowActivityRunId(runid, flowVersion, values);
}
assertEquals(1, rowCount);
}
/**
* Writes 3 applications each with a different run id and version for the same
* {cluster, user, flow}
*
* They should be getting inserted into one record in the flow activity table
* with 3 columns, one per run id
*/
@Test
public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
String cluster = "testManyRunsFlowActivity_cluster1";
String user = "testManyRunsFlowActivity_c_user1";
String flow = "flow_activity_test_flow_name";
String flowVersion1 = "A122110F135BC4";
Long runid1 = 11111111111L;
String flowVersion2 = "A12222222222C4";
long runid2 = 2222222222222L;
String flowVersion3 = "A1333333333C4";
long runid3 = 3333333333333L;
TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
String appName = "application_11888888888_1111";
hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te);
// write an application with to this flow but a different runid/ version
te = new TimelineEntities();
te.addEntity(entityApp1);
appName = "application_11888888888_2222";
hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te);
// write an application with to this flow but a different runid/ version
te = new TimelineEntities();
te.addEntity(entityApp1);
appName = "application_11888888888_3333";
hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te);
hbi.flush();
} finally {
hbi.close();
}
// check flow activity
checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
runid1, flowVersion2, runid2, flowVersion3, runid3);
}
private void checkFlowActivityTableSeveralRuns(String cluster, String user,
String flow, Configuration c1, String flowVersion1, Long runid1,
String flowVersion2, Long runid2, String flowVersion3, Long runid3)
throws IOException {
Scan s = new Scan();
s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
s.setStartRow(startRow);
String clusterStop = cluster + "1";
byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow);
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName
.valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s);
int rowCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
byte[] row = result.getRow();
FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey
.parseRowKey(row);
assertNotNull(flowActivityRowKey);
assertEquals(cluster, flowActivityRowKey.getClusterId());
assertEquals(user, flowActivityRowKey.getUserId());
assertEquals(flow, flowActivityRowKey.getFlowId());
long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
.currentTimeMillis());
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
Map<byte[], byte[]> values = result
.getFamilyMap(FlowActivityColumnFamily.INFO.getBytes());
rowCount++;
assertEquals(3, values.size());
checkFlowActivityRunId(runid1, flowVersion1, values);
checkFlowActivityRunId(runid2, flowVersion2, values);
checkFlowActivityRunId(runid3, flowVersion3, values);
}
// the flow activity table is such that it will insert
// into current day's record
// hence, if this test runs across the midnight boundary,
// it may fail since it would insert into two records
// one for each day
assertEquals(1, rowCount);
}
private void checkFlowActivityRunId(Long runid, String flowVersion,
Map<byte[], byte[]> values) throws IOException {
byte[] rq = ColumnHelper.getColumnQualifier(
FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(),
GenericObjectMapper.write(runid));
for (Map.Entry<byte[], byte[]> k : values.entrySet()) {
String actualQ = Bytes.toString(k.getKey());
if (Bytes.toString(rq).equals(actualQ)) {
String actualV = (String) GenericObjectMapper.read(k.getValue());
assertEquals(flowVersion, actualV);
}
}
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();
}
}

View File

@ -0,0 +1,290 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests the FlowRun and FlowActivity Tables
*/
public class TestHBaseStorageFlowRun {
private static HBaseTestingUtility util;
private final String metric1 = "MAP_SLOT_MILLIS";
private final String metric2 = "HDFS_BYTES_READ";
@BeforeClass
public static void setupBeforeClass() throws Exception {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
conf.setInt("hfile.format.version", 3);
util.startMiniCluster();
createSchema();
}
private static void createSchema() throws IOException {
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
}
/**
* Writes 4 timeline entities belonging to one flow run through the
* {@link HBaseTimelineWriterImpl}
*
* Checks the flow run table contents
*
* The first entity has a created event, metrics and a finish event.
*
* The second entity has a created event and this is the entity with smallest
* start time. This should be the start time for the flow run.
*
* The third entity has a finish event and this is the entity with the max end
* time. This should be the end time for the flow run.
*
* The fourth entity has a created event which has a start time that is
* greater than min start time.
*
*/
@Test
public void testWriteFlowRunMinMax() throws Exception {
TimelineEntities te = new TimelineEntities();
te.addEntity(TestFlowDataGenerator.getEntity1());
HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
String user = "testWriteFlowRunMinMaxToHBase_user1";
String flow = "testing_flowRun_flow_name";
String flowVersion = "CF7022C10F1354";
Long runid = 1002345678919L;
String appName = "application_100000000000_1111";
long endTs = 1439750690000L;
TimelineEntity entityMinStartTime = TestFlowDataGenerator
.getEntityMinStartTime();
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another entity with the right min start time
te = new TimelineEntities();
te.addEntity(entityMinStartTime);
appName = "application_100000000000_3333";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// writer another entity for max end time
TimelineEntity entityMaxEndTime = TestFlowDataGenerator
.getEntityMaxEndTime(endTs);
te = new TimelineEntities();
te.addEntity(entityMaxEndTime);
appName = "application_100000000000_4444";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// writer another entity with greater start time
TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
.getEntityGreaterStartTime();
te = new TimelineEntities();
te.addEntity(entityGreaterStartTime);
appName = "application_1000000000000000_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// flush everything to hbase
hbi.flush();
} finally {
hbi.close();
}
Connection conn = ConnectionFactory.createConnection(c1);
// check in flow run table
Table table1 = conn.getTable(TableName
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
// scan the table and see that we get back the right min and max
// timestamps
byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
Get g = new Get(startRow);
g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
Result r1 = table1.get(g);
assertNotNull(r1);
assertTrue(!r1.isEmpty());
Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO
.getBytes());
assertEquals(2, r1.size());
Long starttime = (Long) GenericObjectMapper.read(values
.get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
Long expmin = entityMinStartTime.getCreatedTime();
assertEquals(expmin, starttime);
assertEquals(endTs, GenericObjectMapper.read(values
.get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
}
boolean isFlowRunRowKeyCorrect(byte[] rowKey, String cluster, String user,
String flow, Long runid) {
byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
assertTrue(rowKeyComponents.length == 4);
assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
assertEquals(user, Bytes.toString(rowKeyComponents[1]));
assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
assertEquals(TimelineWriterUtils.invert(runid),
Bytes.toLong(rowKeyComponents[3]));
return true;
}
/**
* Writes two application entities of the same flow run. Each application has
* two metrics: slot millis and hdfs bytes read. Each metric has values at two
* timestamps.
*
* Checks the metric values of the flow in the flow run table. Flow metric
* values should be the sum of individual metric values that belong to the
* latest timestamp for that metric
*/
@Test
public void testWriteFlowRunMetricsOneFlow() throws Exception {
String cluster = "testWriteFlowRunMetricsOneFlow_cluster1";
String user = "testWriteFlowRunMetricsOneFlow_user1";
String flow = "testing_flowRun_metrics_flow_name";
String flowVersion = "CF7022C10F1354";
Long runid = 1002345678919L;
TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
String appName = "application_11111111111111_1111";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another application with same metric to this flow
te = new TimelineEntities();
TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
te.addEntity(entityApp2);
appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
hbi.flush();
} finally {
hbi.close();
}
// check flow run
checkFlowRunTable(cluster, user, flow, runid, c1);
}
private void checkFlowRunTable(String cluster, String user, String flow,
long runid, Configuration c1) throws IOException {
Scan s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
s.setStartRow(startRow);
String clusterStop = cluster + "1";
byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid);
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s);
int rowCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO
.getBytes());
rowCount++;
// check metric1
byte[] q = ColumnHelper.getColumnQualifier(
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1);
assertTrue(values.containsKey(q));
assertEquals(141, GenericObjectMapper.read(values.get(q)));
// check metric2
assertEquals(2, values.size());
q = ColumnHelper.getColumnQualifier(
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2);
assertTrue(values.containsKey(q));
assertEquals(57, GenericObjectMapper.read(values.get(q)));
}
assertEquals(1, rowCount);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();
}
}