YARN-4062. Add the flush and compaction functionality via coprocessors and scanners for flow run table (Vrushali C via sjlee)

This commit is contained in:
Sangjin Lee 2016-03-17 18:22:04 -07:00
parent 4ba6354717
commit 9bdd455dce
15 changed files with 1363 additions and 87 deletions

View File

@ -1982,6 +1982,22 @@ public class YarnConfiguration extends Configuration {
public static final int public static final int
DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60; DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60;
/**
* The name for setting that controls how long the final value of
* a metric of a completed app is retained before merging
* into the flow sum.
*/
public static final String APP_FINAL_VALUE_RETENTION_THRESHOLD =
TIMELINE_SERVICE_PREFIX
+ "coprocessor.app-final-value-retention-milliseconds";
/**
* The setting that controls how long the final value of a metric
* of a completed app is retained before merging into the flow sum.
*/
public static final long DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD = 3 * 24
* 60 * 60 * 1000L;
public static final String ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS = public static final String ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS =
TIMELINE_SERVICE_PREFIX + "app-collector.linger-period.ms"; TIMELINE_SERVICE_PREFIX + "app-collector.linger-period.ms";

View File

@ -2159,6 +2159,7 @@
<value>604800</value> <value>604800</value>
</property> </property>
<!-- Timeline Service v2 Configuration -->
<property> <property>
<name>yarn.timeline-service.entity-group-fs-store.leveldb-cache-read-cache-size</name> <name>yarn.timeline-service.entity-group-fs-store.leveldb-cache-read-cache-size</name>
<description> <description>
@ -2227,6 +2228,15 @@
<name>yarn.timeline-service.timeline-client.number-of-async-entities-to-merge</name> <name>yarn.timeline-service.timeline-client.number-of-async-entities-to-merge</name>
<value>10</value> <value>10</value>
</property> </property>
<property>
<description> The setting that controls how long the final value
of a metric of a completed app is retained before merging into
the flow sum.</description>
<name>yarn.timeline-service.coprocessor.app-final-value-retention-milliseconds</name>
<value>259200000</value>
</property>
<!-- Shared Cache Configuration --> <!-- Shared Cache Configuration -->
<property> <property>

View File

@ -261,7 +261,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
flowRunId); flowRunId);
storeFlowMetrics(rowKey, metrics, storeFlowMetrics(rowKey, metrics,
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId),
AggregationOperation.SUM.getAttribute());
} }
} }

View File

@ -24,9 +24,13 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -475,4 +479,55 @@ public final class TimelineStorageUtils {
return (obj instanceof Short) || (obj instanceof Integer) || return (obj instanceof Short) || (obj instanceof Integer) ||
(obj instanceof Long); (obj instanceof Long);
} }
/**
* creates a new cell based on the input cell but with the new value.
*
* @param origCell Original cell
* @param newValue new cell value
* @return cell
* @throws IOException while creating new cell.
*/
public static Cell createNewCell(Cell origCell, byte[] newValue)
throws IOException {
return CellUtil.createCell(CellUtil.cloneRow(origCell),
CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
}
/**
* creates a cell with the given inputs.
*
* @param row row of the cell to be created
* @param family column family name of the new cell
* @param qualifier qualifier for the new cell
* @param ts timestamp of the new cell
* @param newValue value of the new cell
* @param tags tags in the new cell
* @return cell
* @throws IOException while creating the cell.
*/
public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
long ts, byte[] newValue, byte[] tags) throws IOException {
return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
newValue, tags);
}
/**
* returns app id from the list of tags.
*
* @param tags cell tags to be looked into
* @return App Id as the AggregationCompactionDimension
*/
public static String getAggregationCompactionDimension(List<Tag> tags) {
String appId = null;
for (Tag t : tags) {
if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
.getType()) {
appId = Bytes.toString(t.getValue());
return appId;
}
}
return appId;
}
} }

View File

@ -33,7 +33,7 @@ public class TimestampGenerator {
* if this is changed, then reading cell timestamps written with older * if this is changed, then reading cell timestamps written with older
* multiplier value will not work * multiplier value will not work
*/ */
public static final long TS_MULTIPLIER = 1000L; public static final long TS_MULTIPLIER = 1000000L;
private final AtomicLong lastTimestamp = new AtomicLong(); private final AtomicLong lastTimestamp = new AtomicLong();
@ -74,13 +74,14 @@ public class TimestampGenerator {
} }
/** /**
* returns a timestamp multiplied with TS_MULTIPLIER and last few digits of * Returns a timestamp multiplied with TS_MULTIPLIER and last few digits of
* application id * application id.
* *
* Unlikely scenario of generating a timestamp that is a duplicate: If more * Unlikely scenario of generating a timestamp that is a duplicate: If more
* than a 1000 concurrent apps are running in one flow run AND write to same * than a 1M concurrent apps are running in one flow run AND write to same
* column at the same time, then say appId of 1001 will overlap with appId of * column at the same time, then say appId of 1M and 1 will overlap
* 001 and there may be collisions for that flow run's specific column. * with appId of 001 and there may be collisions for that flow run's
* specific column.
* *
* @param incomingTS Timestamp to be converted. * @param incomingTS Timestamp to be converted.
* @param appId Application Id. * @param appId Application Id.

View File

@ -21,19 +21,19 @@ import org.apache.hadoop.hbase.util.Bytes;
/** /**
* Identifies the attributes to be set for puts into the {@link FlowRunTable}. * Identifies the attributes to be set for puts into the {@link FlowRunTable}.
* The numbers used for tagType are prime numbers * The numbers used for tagType are prime numbers.
*/ */
public enum AggregationOperation { public enum AggregationOperation {
/** /**
* When the flow was started. * When the flow was started.
*/ */
MIN((byte) 71), GLOBAL_MIN((byte) 71),
/** /**
* When it ended. * When it ended.
*/ */
MAX((byte) 73), GLOBAL_MAX((byte) 73),
/** /**
* The metrics of the flow. * The metrics of the flow.
@ -46,9 +46,16 @@ public enum AggregationOperation {
SUM_FINAL((byte) 83), SUM_FINAL((byte) 83),
/** /**
* compact. * Min value as per the latest timestamp
* seen for a given app.
*/ */
COMPACT((byte) 89); LATEST_MIN((byte) 89),
/**
* Max value as per the latest timestamp
* seen for a given app.
*/
LATEST_MAX((byte) 97);
private byte tagType; private byte tagType;
private byte[] inBytes; private byte[] inBytes;

View File

@ -41,14 +41,14 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
* application start times. * application start times.
*/ */
MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time", MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time",
AggregationOperation.MIN, LongConverter.getInstance()), AggregationOperation.GLOBAL_MIN, LongConverter.getInstance()),
/** /**
* When the flow ended. This is the maximum of currently known application end * When the flow ended. This is the maximum of currently known application end
* times. * times.
*/ */
MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time", MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time",
AggregationOperation.MAX, LongConverter.getInstance()), AggregationOperation.GLOBAL_MAX, LongConverter.getInstance()),
/** /**
* The version of the flow that this flow belongs to. * The version of the flow that this flow belongs to.

View File

@ -40,7 +40,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
/** /**
* To store flow run info values. * To store flow run info values.
*/ */
METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM, METRIC(FlowRunColumnFamily.INFO, "m", null,
LongConverter.getInstance()); LongConverter.getInstance());
private final ColumnHelper<FlowRunTable> column; private final ColumnHelper<FlowRunTable> column;

View File

@ -40,7 +40,12 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
@ -51,7 +56,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGen
*/ */
public class FlowRunCoprocessor extends BaseRegionObserver { public class FlowRunCoprocessor extends BaseRegionObserver {
@SuppressWarnings("unused")
private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class); private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
private HRegion region; private HRegion region;
@ -160,8 +164,8 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
scan.setMaxVersions(); scan.setMaxVersions();
RegionScanner scanner = null; RegionScanner scanner = null;
try { try {
scanner = new FlowScanner(region, scan.getBatch(), scanner = new FlowScanner(e.getEnvironment(), scan.getBatch(),
region.getScanner(scan)); region.getScanner(scan), FlowScannerOperation.READ);
scanner.next(results); scanner.next(results);
e.bypass(); e.bypass();
} finally { } finally {
@ -209,6 +213,64 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
public RegionScanner postScannerOpen( public RegionScanner postScannerOpen(
ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
RegionScanner scanner) throws IOException { RegionScanner scanner) throws IOException {
return new FlowScanner(region, scan.getBatch(), scanner); return new FlowScanner(e.getEnvironment(), scan.getBatch(),
scanner, FlowScannerOperation.READ);
}
@Override
public InternalScanner preFlush(
ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner) throws IOException {
if (LOG.isDebugEnabled()) {
if (store != null) {
LOG.debug("preFlush store = " + store.getColumnFamilyName()
+ " flushableSize=" + store.getFlushableSize()
+ " flushedCellsCount=" + store.getFlushedCellsCount()
+ " compactedCellsCount=" + store.getCompactedCellsCount()
+ " majorCompactedCellsCount="
+ store.getMajorCompactedCellsCount() + " memstoreFlushSize="
+ store.getMemstoreFlushSize() + " memstoreSize="
+ store.getMemStoreSize() + " size=" + store.getSize()
+ " storeFilesCount=" + store.getStorefilesCount());
}
}
return new FlowScanner(c.getEnvironment(), -1, scanner,
FlowScannerOperation.FLUSH);
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, StoreFile resultFile) {
if (LOG.isDebugEnabled()) {
if (store != null) {
LOG.debug("postFlush store = " + store.getColumnFamilyName()
+ " flushableSize=" + store.getFlushableSize()
+ " flushedCellsCount=" + store.getFlushedCellsCount()
+ " compactedCellsCount=" + store.getCompactedCellsCount()
+ " majorCompactedCellsCount="
+ store.getMajorCompactedCellsCount() + " memstoreFlushSize="
+ store.getMemstoreFlushSize() + " memstoreSize="
+ store.getMemStoreSize() + " size=" + store.getSize()
+ " storeFilesCount=" + store.getStorefilesCount());
}
}
}
@Override
public InternalScanner preCompact(
ObserverContext<RegionCoprocessorEnvironment> e, Store store,
InternalScanner scanner, ScanType scanType, CompactionRequest request)
throws IOException {
FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
if (request != null) {
requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
: FlowScannerOperation.MINOR_COMPACTION);
LOG.info("Compactionrequest= " + request.toString() + " "
+ requestOp.toString() + " RegionName="
+ e.getEnvironment().getRegion().getRegionNameAsString());
}
return new FlowScanner(e.getEnvironment(), -1, scanner, requestOp);
} }
} }

View File

@ -113,4 +113,20 @@ public class FlowRunRowKey {
TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
return new FlowRunRowKey(clusterId, userId, flowName, flowRunId); return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
} }
/**
* returns the Flow Key as a verbose String output.
* @return String
*/
@Override
public String toString() {
StringBuilder flowKeyStr = new StringBuilder();
flowKeyStr.append("{clusterId=" + clusterId);
flowKeyStr.append(" userId=" + userId);
flowKeyStr.append(" flowName=" + flowName);
flowKeyStr.append(" flowRunId=");
flowKeyStr.append(flowRunId);
flowKeyStr.append("}");
return flowKeyStr.toString();
}
} }

View File

@ -29,20 +29,26 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Invoked via the coprocessor when a Get or a Scan is issued for flow run * Invoked via the coprocessor when a Get or a Scan is issued for flow run
@ -55,23 +61,42 @@ class FlowScanner implements RegionScanner, Closeable {
private static final Log LOG = LogFactory.getLog(FlowScanner.class); private static final Log LOG = LogFactory.getLog(FlowScanner.class);
/**
* use a special application id to represent the flow id this is needed since
* TimestampGenerator parses the app id to generate a cell timestamp.
*/
private static final String FLOW_APP_ID = "application_00000000000_0000";
private final HRegion region; private final HRegion region;
private final InternalScanner flowRunScanner; private final InternalScanner flowRunScanner;
private RegionScanner regionScanner;
private final int limit; private final int limit;
private final long appFinalValueRetentionThreshold;
private RegionScanner regionScanner;
private boolean hasMore; private boolean hasMore;
private byte[] currentRow; private byte[] currentRow;
private List<Cell> availableCells = new ArrayList<>(); private List<Cell> availableCells = new ArrayList<>();
private int currentIndex; private int currentIndex;
private FlowScannerOperation action = FlowScannerOperation.READ;
FlowScanner(HRegion region, int limit, InternalScanner internalScanner) { FlowScanner(RegionCoprocessorEnvironment env, int limit,
this.region = region; InternalScanner internalScanner, FlowScannerOperation action) {
this.limit = limit; this.limit = limit;
this.flowRunScanner = internalScanner; this.flowRunScanner = internalScanner;
if (internalScanner instanceof RegionScanner) { if (internalScanner instanceof RegionScanner) {
this.regionScanner = (RegionScanner) internalScanner; this.regionScanner = (RegionScanner) internalScanner;
} }
// TODO: note if it's compaction/flush this.action = action;
if (env == null) {
this.appFinalValueRetentionThreshold =
YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD;
this.region = null;
} else {
this.region = env.getRegion();
Configuration hbaseConf = env.getConfiguration();
this.appFinalValueRetentionThreshold = hbaseConf.getLong(
YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD,
YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD);
}
} }
/* /*
@ -104,17 +129,6 @@ class FlowScanner implements RegionScanner, Closeable {
return nextInternal(cells, cellLimit); return nextInternal(cells, cellLimit);
} }
private String getAggregationCompactionDimension(List<Tag> tags) {
String appId = null;
for (Tag t : tags) {
if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
.getType()) {
appId = Bytes.toString(t.getValue());
}
}
return appId;
}
/** /**
* Get value converter associated with a column or a column prefix. If nothing * Get value converter associated with a column or a column prefix. If nothing
* matches, generic converter is returned. * matches, generic converter is returned.
@ -165,6 +179,7 @@ class FlowScanner implements RegionScanner, Closeable {
* @return true if next row is available for the scanner, false otherwise * @return true if next row is available for the scanner, false otherwise
* @throws IOException * @throws IOException
*/ */
@SuppressWarnings("deprecation")
private boolean nextInternal(List<Cell> cells, int cellLimit) private boolean nextInternal(List<Cell> cells, int cellLimit)
throws IOException { throws IOException {
Cell cell = null; Cell cell = null;
@ -183,14 +198,18 @@ class FlowScanner implements RegionScanner, Closeable {
SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
Set<String> alreadySeenAggDim = new HashSet<>(); Set<String> alreadySeenAggDim = new HashSet<>();
int addedCnt = 0; int addedCnt = 0;
long currentTimestamp = System.currentTimeMillis();
ValueConverter converter = null; ValueConverter converter = null;
while (((cell = peekAtNextCell(cellLimit)) != null) while (cellLimit <= 0 || addedCnt < cellLimit) {
&& (cellLimit <= 0 || addedCnt < cellLimit)) { cell = peekAtNextCell(cellLimit);
if (cell == null) {
break;
}
byte[] newColumnQualifier = CellUtil.cloneQualifier(cell); byte[] newColumnQualifier = CellUtil.cloneQualifier(cell);
if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) { if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
if (converter != null && isNumericConverter(converter)) { if (converter != null && isNumericConverter(converter)) {
addedCnt += emitCells(cells, currentColumnCells, currentAggOp, addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
(NumericValueConverter)converter); (NumericValueConverter)converter, currentTimestamp);
} }
resetState(currentColumnCells, alreadySeenAggDim); resetState(currentColumnCells, alreadySeenAggDim);
currentColumnQualifier = newColumnQualifier; currentColumnQualifier = newColumnQualifier;
@ -207,8 +226,17 @@ class FlowScanner implements RegionScanner, Closeable {
nextCell(cellLimit); nextCell(cellLimit);
} }
if (!currentColumnCells.isEmpty()) { if (!currentColumnCells.isEmpty()) {
emitCells(cells, currentColumnCells, currentAggOp, addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
(NumericValueConverter)converter); (NumericValueConverter)converter, currentTimestamp);
if (LOG.isDebugEnabled()) {
if (addedCnt > 0) {
LOG.debug("emitted cells. " + addedCnt + " for " + this.action
+ " rowKey="
+ FlowRunRowKey.parseRowKey(cells.get(0).getRow()).toString());
} else {
LOG.debug("emitted no cells for " + this.action);
}
}
} }
return hasMore(); return hasMore();
} }
@ -247,7 +275,7 @@ class FlowScanner implements RegionScanner, Closeable {
} }
switch (currentAggOp) { switch (currentAggOp) {
case MIN: case GLOBAL_MIN:
if (currentColumnCells.size() == 0) { if (currentColumnCells.size() == 0) {
currentColumnCells.add(cell); currentColumnCells.add(cell);
} else { } else {
@ -260,7 +288,7 @@ class FlowScanner implements RegionScanner, Closeable {
} }
} }
break; break;
case MAX: case GLOBAL_MAX:
if (currentColumnCells.size() == 0) { if (currentColumnCells.size() == 0) {
currentColumnCells.add(cell); currentColumnCells.add(cell);
} else { } else {
@ -275,16 +303,32 @@ class FlowScanner implements RegionScanner, Closeable {
break; break;
case SUM: case SUM:
case SUM_FINAL: case SUM_FINAL:
if (LOG.isTraceEnabled()) {
LOG.trace("In collect cells "
+ " FlowSannerOperation="
+ this.action
+ " currentAggOp="
+ currentAggOp
+ " cell qualifier="
+ Bytes.toString(CellUtil.cloneQualifier(cell))
+ " cell value= "
+ (Number) converter.decodeValue(CellUtil.cloneValue(cell))
+ " timestamp=" + cell.getTimestamp());
}
// only if this app has not been seen yet, add to current column cells // only if this app has not been seen yet, add to current column cells
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength()); cell.getTagsLength());
String aggDim = getAggregationCompactionDimension(tags); String aggDim = TimelineStorageUtils
.getAggregationCompactionDimension(tags);
// If this agg dimension has already been seen, since they show up in
// sorted order, we drop the rest which are older. In other words, this
// cell is older than previously seen cells for that agg dim.
if (!alreadySeenAggDim.contains(aggDim)) { if (!alreadySeenAggDim.contains(aggDim)) {
// Not seen this agg dim, hence consider this cell in our working set // if this agg dimension has already been seen,
// since they show up in sorted order
// we drop the rest which are older
// in other words, this cell is older than previously seen cells
// for that agg dim
// but when this agg dim is not seen,
// consider this cell in our working set
currentColumnCells.add(cell); currentColumnCells.add(cell);
alreadySeenAggDim.add(aggDim); alreadySeenAggDim.add(aggDim);
} }
@ -300,8 +344,8 @@ class FlowScanner implements RegionScanner, Closeable {
* parameter. * parameter.
*/ */
private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells, private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
AggregationOperation currentAggOp, NumericValueConverter converter) AggregationOperation currentAggOp, NumericValueConverter converter,
throws IOException { long currentTimestamp) throws IOException {
if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) { if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
return 0; return 0;
} }
@ -309,17 +353,36 @@ class FlowScanner implements RegionScanner, Closeable {
cells.addAll(currentColumnCells); cells.addAll(currentColumnCells);
return currentColumnCells.size(); return currentColumnCells.size();
} }
if (LOG.isTraceEnabled()) {
LOG.trace("In emitCells " + this.action + " currentColumnCells size= "
+ currentColumnCells.size() + " currentAggOp" + currentAggOp);
}
switch (currentAggOp) { switch (currentAggOp) {
case MIN: case GLOBAL_MIN:
case MAX: case GLOBAL_MAX:
cells.addAll(currentColumnCells); cells.addAll(currentColumnCells);
return currentColumnCells.size(); return currentColumnCells.size();
case SUM: case SUM:
case SUM_FINAL: case SUM_FINAL:
Cell sumCell = processSummation(currentColumnCells, converter); switch (action) {
cells.add(sumCell); case FLUSH:
return 1; case MINOR_COMPACTION:
cells.addAll(currentColumnCells);
return currentColumnCells.size();
case READ:
Cell sumCell = processSummation(currentColumnCells, converter);
cells.add(sumCell);
return 1;
case MAJOR_COMPACTION:
List<Cell> finalCells = processSummationMajorCompaction(
currentColumnCells, converter, currentTimestamp);
cells.addAll(finalCells);
return finalCells.size();
default:
cells.addAll(currentColumnCells);
return currentColumnCells.size();
}
default: default:
cells.addAll(currentColumnCells); cells.addAll(currentColumnCells);
return currentColumnCells.size(); return currentColumnCells.size();
@ -349,10 +412,122 @@ class FlowScanner implements RegionScanner, Closeable {
sum = converter.add(sum, currentValue); sum = converter.add(sum, currentValue);
} }
byte[] sumBytes = converter.encodeValue(sum); byte[] sumBytes = converter.encodeValue(sum);
Cell sumCell = createNewCell(mostRecentCell, sumBytes); Cell sumCell = TimelineStorageUtils.createNewCell(mostRecentCell, sumBytes);
return sumCell; return sumCell;
} }
/**
* Returns a list of cells that contains
*
* A) the latest cells for applications that haven't finished yet
* B) summation
* for the flow, based on applications that have completed and are older than
* a certain time
*
* The new cell created has the timestamp of the most recent metric cell. The
* sum of a metric for a flow run is the summation at the point of the last
* metric update in that flow till that time.
*/
@VisibleForTesting
List<Cell> processSummationMajorCompaction(
SortedSet<Cell> currentColumnCells, NumericValueConverter converter,
long currentTimestamp)
throws IOException {
Number sum = 0;
Number currentValue = 0;
long ts = 0L;
boolean summationDone = false;
List<Cell> finalCells = new ArrayList<Cell>();
if (currentColumnCells == null) {
return finalCells;
}
if (LOG.isDebugEnabled()) {
LOG.debug("In processSummationMajorCompaction,"
+ " will drop cells older than " + currentTimestamp
+ " CurrentColumnCells size=" + currentColumnCells.size());
}
for (Cell cell : currentColumnCells) {
AggregationOperation cellAggOp = getCurrentAggOp(cell);
// if this is the existing flow sum cell
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
String appId = TimelineStorageUtils
.getAggregationCompactionDimension(tags);
if (appId == FLOW_APP_ID) {
sum = converter.add(sum, currentValue);
summationDone = true;
if (LOG.isTraceEnabled()) {
LOG.trace("reading flow app id sum=" + sum);
}
} else {
currentValue = (Number) converter.decodeValue(CellUtil
.cloneValue(cell));
// read the timestamp truncated by the generator
ts = TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp());
if ((cellAggOp == AggregationOperation.SUM_FINAL)
&& ((ts + this.appFinalValueRetentionThreshold)
< currentTimestamp)) {
sum = converter.add(sum, currentValue);
summationDone = true;
if (LOG.isTraceEnabled()) {
LOG.trace("MAJOR COMPACTION loop sum= " + sum
+ " discarding now: " + " qualifier="
+ Bytes.toString(CellUtil.cloneQualifier(cell)) + " value="
+ (Number) converter.decodeValue(CellUtil.cloneValue(cell))
+ " timestamp=" + cell.getTimestamp() + " " + this.action);
}
} else {
// not a final value but it's the latest cell for this app
// so include this cell in the list of cells to write back
finalCells.add(cell);
}
}
}
if (summationDone) {
Cell anyCell = currentColumnCells.first();
List<Tag> tags = new ArrayList<Tag>();
Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(),
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
Cell sumCell = TimelineStorageUtils.createNewCell(
CellUtil.cloneRow(anyCell),
CellUtil.cloneFamily(anyCell),
CellUtil.cloneQualifier(anyCell),
TimestampGenerator.getSupplementedTimestamp(
System.currentTimeMillis(), FLOW_APP_ID),
converter.encodeValue(sum), tagByteArray);
finalCells.add(sumCell);
if (LOG.isTraceEnabled()) {
LOG.trace("MAJOR COMPACTION final sum= " + sum + " for "
+ Bytes.toString(CellUtil.cloneQualifier(sumCell))
+ " " + this.action);
}
LOG.info("After major compaction for qualifier="
+ Bytes.toString(CellUtil.cloneQualifier(sumCell))
+ " with currentColumnCells.size="
+ currentColumnCells.size()
+ " returning finalCells.size=" + finalCells.size()
+ " with sum=" + sum.longValue()
+ " with cell timestamp " + sumCell.getTimestamp());
} else {
String qualifier = "";
LOG.info("After major compaction for qualifier=" + qualifier
+ " with currentColumnCells.size="
+ currentColumnCells.size()
+ " returning finalCells.size=" + finalCells.size()
+ " with zero sum="
+ sum.longValue());
}
return finalCells;
}
/** /**
* Determines which cell is to be returned based on the values in each cell * Determines which cell is to be returned based on the values in each cell
* and the comparison operation MIN or MAX. * and the comparison operation MIN or MAX.
@ -375,7 +550,7 @@ class FlowScanner implements RegionScanner, Closeable {
Number currentCellValue = (Number) converter.decodeValue(CellUtil Number currentCellValue = (Number) converter.decodeValue(CellUtil
.cloneValue(currentCell)); .cloneValue(currentCell));
switch (currentAggOp) { switch (currentAggOp) {
case MIN: case GLOBAL_MIN:
if (converter.compare( if (converter.compare(
currentCellValue, previouslyChosenCellValue) < 0) { currentCellValue, previouslyChosenCellValue) < 0) {
// new value is minimum, hence return this cell // new value is minimum, hence return this cell
@ -384,7 +559,7 @@ class FlowScanner implements RegionScanner, Closeable {
// previously chosen value is miniumum, hence return previous min cell // previously chosen value is miniumum, hence return previous min cell
return previouslyChosenCell; return previouslyChosenCell;
} }
case MAX: case GLOBAL_MAX:
if (converter.compare( if (converter.compare(
currentCellValue, previouslyChosenCellValue) > 0) { currentCellValue, previouslyChosenCellValue) > 0) {
// new value is max, hence return this cell // new value is max, hence return this cell
@ -402,16 +577,13 @@ class FlowScanner implements RegionScanner, Closeable {
} }
} }
private Cell createNewCell(Cell origCell, byte[] newValue)
throws IOException {
return CellUtil.createCell(CellUtil.cloneRow(origCell),
CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {
flowRunScanner.close(); if (flowRunScanner != null) {
flowRunScanner.close();
} else {
LOG.warn("scanner close called but scanner is null");
}
} }
/** /**
@ -423,8 +595,6 @@ class FlowScanner implements RegionScanner, Closeable {
/** /**
* Returns whether or not the underlying scanner has more rows. * Returns whether or not the underlying scanner has more rows.
*
* @return true, if there are more cells to return, false otherwise.
*/ */
public boolean hasMore() { public boolean hasMore() {
return currentIndex < availableCells.size() ? true : hasMore; return currentIndex < availableCells.size() ? true : hasMore;
@ -440,8 +610,7 @@ class FlowScanner implements RegionScanner, Closeable {
* fetched by the wrapped scanner * fetched by the wrapped scanner
* @return the next available cell or null if no more cells are available for * @return the next available cell or null if no more cells are available for
* the current row * the current row
* @throws IOException if any problem is encountered while grabbing the next * @throws IOException
* cell.
*/ */
public Cell nextCell(int cellLimit) throws IOException { public Cell nextCell(int cellLimit) throws IOException {
Cell cell = peekAtNextCell(cellLimit); Cell cell = peekAtNextCell(cellLimit);

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
/**
* Identifies the scanner operation on the {@link FlowRunTable}.
*/
public enum FlowScannerOperation {
/**
* If the scanner is opened for reading
* during preGet or preScan.
*/
READ,
/**
* If the scanner is opened during preFlush.
*/
FLUSH,
/**
* If the scanner is opened during minor Compaction.
*/
MINOR_COMPACTION,
/**
* If the scanner is opened during major Compaction.
*/
MAJOR_COMPACTION
}

View File

@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.yarn.server.timelineservice.storage.flow; package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
@ -29,17 +28,18 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.conf.Configuration;
/** /**
* Generates the data/entities for the FlowRun and FlowActivity Tables * Generates the data/entities for the FlowRun and FlowActivity Tables
*/ */
class TestFlowDataGenerator { class TestFlowDataGenerator {
private final static String metric1 = "MAP_SLOT_MILLIS"; private static final String metric1 = "MAP_SLOT_MILLIS";
private final static String metric2 = "HDFS_BYTES_READ"; private static final String metric2 = "HDFS_BYTES_READ";
public static final long END_TS_INCR = 10000L;
static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) {
static TimelineEntity getEntityMetricsApp1() {
TimelineEntity entity = new TimelineEntity(); TimelineEntity entity = new TimelineEntity();
String id = "flowRunMetrics_test"; String id = "flowRunMetrics_test";
String type = TimelineEntityType.YARN_APPLICATION.toString(); String type = TimelineEntityType.YARN_APPLICATION.toString();
@ -53,8 +53,11 @@ class TestFlowDataGenerator {
TimelineMetric m1 = new TimelineMetric(); TimelineMetric m1 = new TimelineMetric();
m1.setId(metric1); m1.setId(metric1);
Map<Long, Number> metricValues = new HashMap<Long, Number>(); Map<Long, Number> metricValues = new HashMap<Long, Number>();
long ts = System.currentTimeMillis(); long ts = insertTs;
metricValues.put(ts - 100000, 2L);
for (int k=1; k< 100 ; k++) {
metricValues.put(ts - k*200000, 20L);
}
metricValues.put(ts - 80000, 40L); metricValues.put(ts - 80000, 40L);
m1.setType(Type.TIME_SERIES); m1.setType(Type.TIME_SERIES);
m1.setValues(metricValues); m1.setValues(metricValues);
@ -64,6 +67,86 @@ class TestFlowDataGenerator {
m2.setId(metric2); m2.setId(metric2);
metricValues = new HashMap<Long, Number>(); metricValues = new HashMap<Long, Number>();
ts = System.currentTimeMillis(); ts = System.currentTimeMillis();
for (int k=1; k< 100 ; k++) {
metricValues.put(ts - k*100000, 31L);
}
metricValues.put(ts - 80000, 57L);
m2.setType(Type.TIME_SERIES);
m2.setValues(metricValues);
metrics.add(m2);
entity.addMetrics(metrics);
return entity;
}
static TimelineEntity getEntityMetricsApp1Complete(long insertTs, Configuration c1) {
TimelineEntity entity = new TimelineEntity();
String id = "flowRunMetrics_test";
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
long cTime = 1425016501000L;
entity.setCreatedTime(cTime);
// add metrics
Set<TimelineMetric> metrics = new HashSet<>();
TimelineMetric m1 = new TimelineMetric();
m1.setId(metric1);
Map<Long, Number> metricValues = new HashMap<Long, Number>();
long ts = insertTs;
metricValues.put(ts - 80000, 40L);
m1.setType(Type.TIME_SERIES);
m1.setValues(metricValues);
metrics.add(m1);
TimelineMetric m2 = new TimelineMetric();
m2.setId(metric2);
metricValues = new HashMap<Long, Number>();
ts = insertTs;
metricValues.put(ts - 80000, 57L);
m2.setType(Type.TIME_SERIES);
m2.setValues(metricValues);
metrics.add(m2);
entity.addMetrics(metrics);
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
event.setTimestamp(insertTs);
event.addInfo("done", "insertTs=" + insertTs);
entity.addEvent(event);
return entity;
}
static TimelineEntity getEntityMetricsApp1(long insertTs) {
TimelineEntity entity = new TimelineEntity();
String id = "flowRunMetrics_test";
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
long cTime = 1425016501000L;
entity.setCreatedTime(cTime);
// add metrics
Set<TimelineMetric> metrics = new HashSet<>();
TimelineMetric m1 = new TimelineMetric();
m1.setId(metric1);
Map<Long, Number> metricValues = new HashMap<Long, Number>();
long ts = insertTs;
metricValues.put(ts - 100000, 2L);
metricValues.put(ts - 80000, 40L);
m1.setType(Type.TIME_SERIES);
m1.setValues(metricValues);
metrics.add(m1);
TimelineMetric m2 = new TimelineMetric();
m2.setId(metric2);
metricValues = new HashMap<Long, Number>();
ts = insertTs;
metricValues.put(ts - 100000, 31L); metricValues.put(ts - 100000, 31L);
metricValues.put(ts - 80000, 57L); metricValues.put(ts - 80000, 57L);
m2.setType(Type.TIME_SERIES); m2.setType(Type.TIME_SERIES);
@ -74,7 +157,8 @@ class TestFlowDataGenerator {
return entity; return entity;
} }
static TimelineEntity getEntityMetricsApp2() {
static TimelineEntity getEntityMetricsApp2(long insertTs) {
TimelineEntity entity = new TimelineEntity(); TimelineEntity entity = new TimelineEntity();
String id = "flowRunMetrics_test"; String id = "flowRunMetrics_test";
String type = TimelineEntityType.YARN_APPLICATION.toString(); String type = TimelineEntityType.YARN_APPLICATION.toString();
@ -87,7 +171,7 @@ class TestFlowDataGenerator {
TimelineMetric m1 = new TimelineMetric(); TimelineMetric m1 = new TimelineMetric();
m1.setId(metric1); m1.setId(metric1);
Map<Long, Number> metricValues = new HashMap<Long, Number>(); Map<Long, Number> metricValues = new HashMap<Long, Number>();
long ts = System.currentTimeMillis(); long ts = insertTs;
metricValues.put(ts - 100000, 5L); metricValues.put(ts - 100000, 5L);
metricValues.put(ts - 80000, 101L); metricValues.put(ts - 80000, 101L);
m1.setType(Type.TIME_SERIES); m1.setType(Type.TIME_SERIES);
@ -140,6 +224,55 @@ class TestFlowDataGenerator {
return entity; return entity;
} }
static TimelineEntity getAFullEntity(long ts, long endTs) {
TimelineEntity entity = new TimelineEntity();
String id = "flowRunFullEntity";
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
entity.setCreatedTime(ts);
// add metrics
Set<TimelineMetric> metrics = new HashSet<>();
TimelineMetric m1 = new TimelineMetric();
m1.setId(metric1);
Map<Long, Number> metricValues = new HashMap<Long, Number>();
metricValues.put(ts - 120000, 100000000L);
metricValues.put(ts - 100000, 200000000L);
metricValues.put(ts - 80000, 300000000L);
metricValues.put(ts - 60000, 400000000L);
metricValues.put(ts - 40000, 50000000000L);
metricValues.put(ts - 20000, 60000000000L);
m1.setType(Type.TIME_SERIES);
m1.setValues(metricValues);
metrics.add(m1);
TimelineMetric m2 = new TimelineMetric();
m2.setId(metric2);
metricValues = new HashMap<Long, Number>();
metricValues.put(ts - 900000, 31L);
metricValues.put(ts - 30000, 57L);
m2.setType(Type.TIME_SERIES);
m2.setValues(metricValues);
metrics.add(m2);
entity.addMetrics(metrics);
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
event.setTimestamp(ts);
String expKey = "foo_event";
Object expVal = "test";
event.addInfo(expKey, expVal);
entity.addEvent(event);
event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
long expTs = ts + 21600000;// start time + 6hrs
event.setTimestamp(expTs);
event.addInfo(expKey, expVal);
entity.addEvent(event);
return entity;
}
static TimelineEntity getEntityGreaterStartTime(long startTs) { static TimelineEntity getEntityGreaterStartTime(long startTs) {
TimelineEntity entity = new TimelineEntity(); TimelineEntity entity = new TimelineEntity();
entity.setCreatedTime(startTs); entity.setCreatedTime(startTs);
@ -184,6 +317,34 @@ class TestFlowDataGenerator {
return entity; return entity;
} }
static TimelineEntity getMinFlushEntity(long startTs) {
TimelineEntity entity = new TimelineEntity();
String id = "flowRunHelloFlushEntityMin";
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
entity.setCreatedTime(startTs);
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
event.setTimestamp(startTs);
entity.addEvent(event);
return entity;
}
static TimelineEntity getMaxFlushEntity(long startTs) {
TimelineEntity entity = new TimelineEntity();
String id = "flowRunHelloFlushEntityMax";
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
entity.setCreatedTime(startTs);
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
event.setTimestamp(startTs + END_TS_INCR);
entity.addEvent(event);
return entity;
}
static TimelineEntity getFlowApp1(long appCreatedTime) { static TimelineEntity getFlowApp1(long appCreatedTime) {
TimelineEntity entity = new TimelineEntity(); TimelineEntity entity = new TimelineEntity();
@ -203,5 +364,4 @@ class TestFlowDataGenerator {
return entity; return entity;
} }
} }

View File

@ -52,8 +52,8 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -216,7 +216,8 @@ public class TestHBaseStorageFlowRun {
long runid = 1002345678919L; long runid = 1002345678919L;
TimelineEntities te = new TimelineEntities(); TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); TimelineEntity entityApp1 = TestFlowDataGenerator
.getEntityMetricsApp1(System.currentTimeMillis());
te.addEntity(entityApp1); te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null; HBaseTimelineWriterImpl hbi = null;
@ -228,7 +229,8 @@ public class TestHBaseStorageFlowRun {
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another application with same metric to this flow // write another application with same metric to this flow
te = new TimelineEntities(); te = new TimelineEntities();
TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); TimelineEntity entityApp2 = TestFlowDataGenerator
.getEntityMetricsApp2(System.currentTimeMillis());
te.addEntity(entityApp2); te.addEntity(entityApp2);
appName = "application_11111111111111_2222"; appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
@ -323,7 +325,8 @@ public class TestHBaseStorageFlowRun {
long runid = 1002345678919L; long runid = 1002345678919L;
TimelineEntities te = new TimelineEntities(); TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); TimelineEntity entityApp1 = TestFlowDataGenerator
.getEntityMetricsApp1(System.currentTimeMillis());
te.addEntity(entityApp1); te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null; HBaseTimelineWriterImpl hbi = null;
@ -335,7 +338,8 @@ public class TestHBaseStorageFlowRun {
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another application with same metric to this flow // write another application with same metric to this flow
te = new TimelineEntities(); te = new TimelineEntities();
TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); TimelineEntity entityApp2 = TestFlowDataGenerator
.getEntityMetricsApp2(System.currentTimeMillis());
te.addEntity(entityApp2); te.addEntity(entityApp2);
appName = "application_11111111111111_2222"; appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
@ -420,7 +424,8 @@ public class TestHBaseStorageFlowRun {
long runid = 1002345678919L; long runid = 1002345678919L;
TimelineEntities te = new TimelineEntities(); TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); TimelineEntity entityApp1 = TestFlowDataGenerator
.getEntityMetricsApp1(System.currentTimeMillis());
te.addEntity(entityApp1); te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null; HBaseTimelineWriterImpl hbi = null;
@ -432,7 +437,8 @@ public class TestHBaseStorageFlowRun {
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another application with same metric to this flow // write another application with same metric to this flow
te = new TimelineEntities(); te = new TimelineEntities();
TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); TimelineEntity entityApp2 = TestFlowDataGenerator
.getEntityMetricsApp2(System.currentTimeMillis());
te.addEntity(entityApp2); te.addEntity(entityApp2);
appName = "application_11111111111111_2222"; appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te); hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
@ -494,6 +500,98 @@ public class TestHBaseStorageFlowRun {
} }
} }
@Test
public void testWriteFlowRunFlush() throws Exception {
String cluster = "atestFlushFlowRun_cluster1";
String user = "atestFlushFlowRun__user1";
String flow = "atestFlushFlowRun_flow_name";
String flowVersion = "AF1021C19F1351";
long runid = 1449526652000L;
int start = 10;
int count = 20000;
int appIdSuffix = 1;
HBaseTimelineWriterImpl hbi = null;
long insertTs = 1449796654827L - count;
long minTS = insertTs + 1;
long startTs = insertTs;
Configuration c1 = util.getConfiguration();
TimelineEntities te1 = null;
TimelineEntity entityApp1 = null;
TimelineEntity entityApp2 = null;
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
for (int i = start; i < count; i++) {
String appName = "application_1060350000000_" + appIdSuffix;
insertTs++;
te1 = new TimelineEntities();
entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs);
te1.addEntity(entityApp1);
entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
te1.addEntity(entityApp2);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
Thread.sleep(1);
appName = "application_1001199480000_7" + appIdSuffix;
insertTs++;
appIdSuffix++;
te1 = new TimelineEntities();
entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs);
te1.addEntity(entityApp1);
entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
te1.addEntity(entityApp2);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
if (i % 1000 == 0) {
hbi.flush();
checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow,
runid, false);
}
}
} finally {
hbi.flush();
hbi.close();
checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, runid,
true);
}
}
private void checkMinMaxFlush(Configuration c1, long minTS, long startTs,
int count, String cluster, String user, String flow, long runid,
boolean checkMax) throws IOException {
Connection conn = ConnectionFactory.createConnection(c1);
// check in flow run table
Table table1 = conn.getTable(TableName
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
// scan the table and see that we get back the right min and max
// timestamps
byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
Get g = new Get(startRow);
g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
Result r1 = table1.get(g);
assertNotNull(r1);
assertTrue(!r1.isEmpty());
Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO
.getBytes());
int start = 10;
assertEquals(2, r1.size());
long starttime = Bytes.toLong(values
.get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
assertEquals(minTS, starttime);
if (checkMax) {
assertEquals(startTs + 2 * (count - start)
+ TestFlowDataGenerator.END_TS_INCR,
Bytes.toLong(values
.get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
}
}
@AfterClass @AfterClass
public static void tearDownAfterClass() throws Exception { public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster(); util.shutdownMiniCluster();

View File

@ -0,0 +1,635 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotEquals;
import java.io.IOException;
import java.util.Map;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
/**
* Tests the FlowRun and FlowActivity Tables
*/
public class TestHBaseStorageFlowRunCompaction {
private static HBaseTestingUtility util;
private final String metric1 = "MAP_SLOT_MILLIS";
private final String metric2 = "HDFS_BYTES_READ";
private final byte[] aRowKey = Bytes.toBytes("a");
private final byte[] aFamily = Bytes.toBytes("family");
private final byte[] aQualifier = Bytes.toBytes("qualifier");
@BeforeClass
public static void setupBeforeClass() throws Exception {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
conf.setInt("hfile.format.version", 3);
util.startMiniCluster();
createSchema();
}
private static void createSchema() throws IOException {
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
}
@Test
public void testWriteFlowRunCompaction() throws Exception {
String cluster = "kompaction_cluster1";
String user = "kompaction_FlowRun__user1";
String flow = "kompaction_flowRun_flow_name";
String flowVersion = "AF1021C19F1351";
long runid = 1449526652000L;
int start = 10;
int count = 2000;
int appIdSuffix = 1;
HBaseTimelineWriterImpl hbi = null;
long insertTs = System.currentTimeMillis() - count;
Configuration c1 = util.getConfiguration();
TimelineEntities te1 = null;
TimelineEntity entityApp1 = null;
try {
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
// now insert count * ( 100 + 100) metrics
// each call to getEntityMetricsApp1 brings back 100 values
// of metric1 and 100 of metric2
for (int i = start; i < start + count; i++) {
String appName = "application_10240000000000_" + appIdSuffix;
insertTs++;
te1 = new TimelineEntities();
entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(insertTs, c1);
te1.addEntity(entityApp1);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
appName = "application_2048000000000_7" + appIdSuffix;
insertTs++;
te1 = new TimelineEntities();
entityApp1 = TestFlowDataGenerator.getEntityMetricsApp2(insertTs);
te1.addEntity(entityApp1);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
}
} finally {
String appName = "application_10240000000000_" + appIdSuffix;
te1 = new TimelineEntities();
entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1Complete(
insertTs + 1, c1);
te1.addEntity(entityApp1);
hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
hbi.flush();
hbi.close();
}
// check in flow run table
HRegionServer server = util.getRSForFirstRegionInTable(TableName
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
List<HRegion> regions = server.getOnlineRegions(TableName
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
assertTrue("Didn't find any regions for primary table!", regions.size() > 0);
// flush and compact all the regions of the primary table
for (HRegion region : regions) {
region.flushcache();
region.compactStores(true);
}
// check flow run for one flow many apps
checkFlowRunTable(cluster, user, flow, runid, c1, 3);
}
private void checkFlowRunTable(String cluster, String user, String flow,
long runid, Configuration c1, int valueCount) throws IOException {
Scan s = new Scan();
s.addFamily(FlowRunColumnFamily.INFO.getBytes());
byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
s.setStartRow(startRow);
String clusterStop = cluster + "1";
byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid);
s.setStopRow(stopRow);
Connection conn = ConnectionFactory.createConnection(c1);
Table table1 = conn.getTable(TableName
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
ResultScanner scanner = table1.getScanner(s);
int rowCount = 0;
for (Result result : scanner) {
assertNotNull(result);
assertTrue(!result.isEmpty());
Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO
.getBytes());
assertEquals(valueCount, values.size());
rowCount++;
// check metric1
byte[] q = ColumnHelper.getColumnQualifier(
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1);
assertTrue(values.containsKey(q));
assertEquals(141, Bytes.toLong(values.get(q)));
// check metric2
q = ColumnHelper.getColumnQualifier(
FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2);
assertTrue(values.containsKey(q));
assertEquals(57, Bytes.toLong(values.get(q)));
}
assertEquals(1, rowCount);
}
private FlowScanner getFlowScannerForTestingCompaction() {
// create a FlowScanner object with the sole purpose of invoking a process
// summation;
CompactionRequest request = new CompactionRequest();
request.setIsMajor(true, true);
// okay to pass in nulls for the constructor arguments
// because all we want to do is invoke the process summation
FlowScanner fs = new FlowScanner(null, -1, null,
(request.isMajor() == true ? FlowScannerOperation.MAJOR_COMPACTION
: FlowScannerOperation.MINOR_COMPACTION));
assertNotNull(fs);
return fs;
}
@Test
public void checkProcessSummationMoreCellsSumFinal2()
throws IOException {
long cellValue1 = 1236L;
long cellValue2 = 28L;
long cellValue3 = 1236L;
long cellValue4 = 1236L;
FlowScanner fs = getFlowScannerForTestingCompaction();
// note down the current timestamp
long currentTimestamp = System.currentTimeMillis();
long cell1Ts = 1200120L;
long cell2Ts = TimestampGenerator.getSupplementedTimestamp(
System.currentTimeMillis(),"application_123746661110_11202");
long cell3Ts = 1277719L;
long cell4Ts = currentTimestamp - 10;
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
List<Tag> tags = new ArrayList<>();
Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
"application_1234588888_91188");
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
currentColumnCells.add(c1);
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
"application_12700000001_29102");
tags.add(t);
tagByteArray = Tag.fromList(tags);
// create a cell with a recent timestamp and attribute SUM_FINAL
Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
currentColumnCells.add(c2);
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM.getTagType(),
"application_191780000000001_8195");
tags.add(t);
tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c3 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
currentColumnCells.add(c3);
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM.getTagType(),
"application_191780000000001_98104");
tags.add(t);
tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c4 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
currentColumnCells.add(c4);
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
LongConverter.getInstance(), currentTimestamp);
assertNotNull(cells);
// we should be getting back 4 cells
// one is the flow sum cell
// two are the cells with SUM attribute
// one cell with SUM_FINAL
assertEquals(4, cells.size());
for (int i = 0; i < cells.size(); i++) {
Cell returnedCell = cells.get(0);
assertNotNull(returnedCell);
long returnTs = returnedCell.getTimestamp();
long returnValue = Bytes.toLong(CellUtil
.cloneValue(returnedCell));
if (returnValue == cellValue2) {
assertTrue(returnTs == cell2Ts);
} else if (returnValue == cellValue3) {
assertTrue(returnTs == cell3Ts);
} else if (returnValue == cellValue4) {
assertTrue(returnTs == cell4Ts);
} else if (returnValue == cellValue1) {
assertTrue(returnTs != cell1Ts);
assertTrue(returnTs > cell1Ts);
assertTrue(returnTs >= currentTimestamp);
} else {
// raise a failure since we expect only these two values back
Assert.fail();
}
}
}
// tests with many cells
// of type SUM and SUM_FINAL
// all cells of SUM_FINAL will expire
@Test
public void checkProcessSummationMoreCellsSumFinalMany() throws IOException {
FlowScanner fs = getFlowScannerForTestingCompaction();
int count = 200000;
long cellValueFinal = 1000L;
long cellValueNotFinal = 28L;
// note down the current timestamp
long currentTimestamp = System.currentTimeMillis();
long cellTsFinalStart = 10001120L;
long cellTsFinal = cellTsFinalStart;
long cellTsNotFinalStart = currentTimestamp - 5;
long cellTsNotFinal = cellTsNotFinalStart;
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
List<Tag> tags = null;
Tag t = null;
Cell c1 = null;
// insert SUM_FINAL cells
for (int i = 0; i < count; i++) {
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsFinal++;
}
// add SUM cells
for (int i = 0; i < count; i++) {
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM.getTagType(),
"application_1987650000" + i + "83_911" + i);
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with attribute SUM
c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsNotFinal++;
}
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
LongConverter.getInstance(), currentTimestamp);
assertNotNull(cells);
// we should be getting back count + 1 cells
// one is the flow sum cell
// others are the cells with SUM attribute
assertEquals(count + 1, cells.size());
for (int i = 0; i < cells.size(); i++) {
Cell returnedCell = cells.get(0);
assertNotNull(returnedCell);
long returnTs = returnedCell.getTimestamp();
long returnValue = Bytes.toLong(CellUtil
.cloneValue(returnedCell));
if (returnValue == (count * cellValueFinal)) {
assertTrue(returnTs > (cellTsFinalStart + count));
assertTrue(returnTs >= currentTimestamp);
} else if ((returnValue >= cellValueNotFinal)
&& (returnValue <= cellValueNotFinal * count)) {
assertTrue(returnTs >= cellTsNotFinalStart);
assertTrue(returnTs <= cellTsNotFinalStart * count);
} else {
// raise a failure since we expect only these values back
Assert.fail();
}
}
}
// tests with many cells
// of type SUM and SUM_FINAL
// NOT cells of SUM_FINAL will expire
@Test
public void checkProcessSummationMoreCellsSumFinalVariedTags() throws IOException {
FlowScanner fs = getFlowScannerForTestingCompaction();
int countFinal = 20100;
int countNotFinal = 1000;
int countFinalNotExpire = 7009;
long cellValueFinal = 1000L;
long cellValueNotFinal = 28L;
// note down the current timestamp
long currentTimestamp = System.currentTimeMillis();
long cellTsFinalStart = 10001120L;
long cellTsFinal = cellTsFinalStart;
long cellTsFinalStartNotExpire = TimestampGenerator.getSupplementedTimestamp(
System.currentTimeMillis(), "application_10266666661166_118821");
long cellTsFinalNotExpire = cellTsFinalStartNotExpire;
long cellTsNotFinalStart = currentTimestamp - 5;
long cellTsNotFinal = cellTsNotFinalStart;
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
List<Tag> tags = null;
Tag t = null;
Cell c1 = null;
// insert SUM_FINAL cells which will expire
for (int i = 0; i < countFinal; i++) {
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsFinal++;
}
// insert SUM_FINAL cells which will NOT expire
for (int i = 0; i < countFinalNotExpire; i++) {
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsFinalNotExpire++;
}
// add SUM cells
for (int i = 0; i < countNotFinal; i++) {
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM.getTagType(),
"application_1987650000" + i + "83_911" + i);
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
// create a cell with attribute SUM
c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
currentColumnCells.add(c1);
cellTsNotFinal++;
}
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
LongConverter.getInstance(), currentTimestamp);
assertNotNull(cells);
// we should be getting back
// countNotFinal + countFinalNotExpire + 1 cells
// one is the flow sum cell
// count = the cells with SUM attribute
// count = the cells with SUM_FINAL attribute but not expired
assertEquals(countFinalNotExpire + countNotFinal + 1, cells.size());
for (int i = 0; i < cells.size(); i++) {
Cell returnedCell = cells.get(0);
assertNotNull(returnedCell);
long returnTs = returnedCell.getTimestamp();
long returnValue = Bytes.toLong(CellUtil
.cloneValue(returnedCell));
if (returnValue == (countFinal * cellValueFinal)) {
assertTrue(returnTs > (cellTsFinalStart + countFinal));
assertTrue(returnTs >= currentTimestamp);
} else if (returnValue == cellValueNotFinal) {
assertTrue(returnTs >= cellTsNotFinalStart);
assertTrue(returnTs <= cellTsNotFinalStart + countNotFinal);
} else if (returnValue == cellValueFinal){
assertTrue(returnTs >= cellTsFinalStartNotExpire);
assertTrue(returnTs <= cellTsFinalStartNotExpire + countFinalNotExpire);
} else {
// raise a failure since we expect only these values back
Assert.fail();
}
}
}
@Test
public void testProcessSummationMoreCellsSumFinal() throws IOException {
FlowScanner fs = getFlowScannerForTestingCompaction();
// note down the current timestamp
long currentTimestamp = System.currentTimeMillis();
long cellValue1 = 1236L;
long cellValue2 = 28L;
List<Tag> tags = new ArrayList<>();
Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
"application_1234588888_999888");
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
120L, Bytes.toBytes(cellValue1), tagByteArray);
currentColumnCells.add(c1);
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM.getTagType(),
"application_100000000001_119101");
tags.add(t);
tagByteArray = Tag.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
130L, Bytes.toBytes(cellValue2), tagByteArray);
currentColumnCells.add(c2);
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
LongConverter.getInstance(), currentTimestamp);
assertNotNull(cells);
// we should be getting back two cells
// one is the flow sum cell
// another is the cell with SUM attribute
assertEquals(2, cells.size());
Cell returnedCell = cells.get(0);
assertNotNull(returnedCell);
long inputTs1 = c1.getTimestamp();
long inputTs2 = c2.getTimestamp();
long returnTs = returnedCell.getTimestamp();
long returnValue = Bytes.toLong(CellUtil
.cloneValue(returnedCell));
// the returned Ts will be far greater than input ts as well as the noted
// current timestamp
if (returnValue == cellValue2) {
assertTrue(returnTs == inputTs2);
} else if (returnValue == cellValue1) {
assertTrue(returnTs >= currentTimestamp);
assertTrue(returnTs != inputTs1);
} else {
// raise a failure since we expect only these two values back
Assert.fail();
}
}
@Test
public void testProcessSummationOneCellSumFinal() throws IOException {
FlowScanner fs = getFlowScannerForTestingCompaction();
// note down the current timestamp
long currentTimestamp = System.currentTimeMillis();
List<Tag> tags = new ArrayList<>();
Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
"application_123458888888_999888");
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp
Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
120L, Bytes.toBytes(1110L), tagByteArray);
currentColumnCells.add(c1);
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
LongConverter.getInstance(), currentTimestamp);
assertNotNull(cells);
// we should not get the same cell back
// but we get back the flow cell
assertEquals(1, cells.size());
Cell returnedCell = cells.get(0);
// it's NOT the same cell
assertNotEquals(c1, returnedCell);
long inputTs = c1.getTimestamp();
long returnTs = returnedCell.getTimestamp();
// the returned Ts will be far greater than input ts as well as the noted
// current timestamp
assertTrue(returnTs > inputTs);
assertTrue(returnTs >= currentTimestamp);
}
@Test
public void testProcessSummationOneCell() throws IOException {
FlowScanner fs = getFlowScannerForTestingCompaction();
// note down the current timestamp
long currentTimestamp = System.currentTimeMillis();
// try for 1 cell with tag SUM
List<Tag> tags = new ArrayList<>();
Tag t = new Tag(AggregationOperation.SUM.getTagType(),
"application_123458888888_999888");
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
currentTimestamp, Bytes.toBytes(1110L), tagByteArray);
currentColumnCells.add(c1);
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
LongConverter.getInstance(), currentTimestamp);
assertNotNull(cells);
// we expect the same cell back
assertEquals(1, cells.size());
Cell c2 = cells.get(0);
assertEquals(c1, c2);
assertEquals(currentTimestamp, c2.getTimestamp());
}
@Test
public void testProcessSummationEmpty() throws IOException {
FlowScanner fs = getFlowScannerForTestingCompaction();
long currentTimestamp = System.currentTimeMillis();
SortedSet<Cell> currentColumnCells = null;
List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
LongConverter.getInstance(), currentTimestamp);
assertNotNull(cells);
assertEquals(0, cells.size());
currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
cells = fs.processSummationMajorCompaction(currentColumnCells,
LongConverter.getInstance(), currentTimestamp);
assertNotNull(cells);
assertEquals(0, cells.size());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();
}
}