YARN-4986. Add a check in the coprocessor for table to operated on (Vrushali C via sjlee)

This commit is contained in:
Sangjin Lee 2016-04-29 17:13:32 -07:00
parent 39cce4e629
commit 69dc561b61
6 changed files with 160 additions and 11 deletions

View File

@ -32,8 +32,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Result;
@ -56,6 +58,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Fiel
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
import org.apache.hadoop.yarn.util.ConverterUtils;
/**
@ -887,4 +890,21 @@ public final class TimelineStorageUtils {
Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
entity.addEvents(eventsSet);
}
public static boolean isFlowRunTable(HRegionInfo hRegionInfo,
Configuration conf) {
String regionTableName = hRegionInfo.getTable().getNameAsString();
String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME,
FlowRunTable.DEFAULT_TABLE_NAME);
if (LOG.isDebugEnabled()) {
LOG.debug("regionTableName=" + regionTableName);
}
if (flowRunTableName.equalsIgnoreCase(regionTableName)) {
if (LOG.isDebugEnabled()) {
LOG.debug(" table is the flow run table!! " + flowRunTableName);
}
return true;
}
return false;
}
}

View File

@ -84,7 +84,7 @@ public class EntityTable extends BaseTable<EntityTable> {
+ ".table.metrics.ttl";
/** default value for entity table name. */
private static final String DEFAULT_TABLE_NAME = "timelineservice.entity";
public static final String DEFAULT_TABLE_NAME = "timelineservice.entity";
/** default TTL is 30 days for metrics timeseries. */
private static final int DEFAULT_METRICS_TTL = 2592000;

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGen
public class FlowRunCoprocessor extends BaseRegionObserver {
private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
private boolean isFlowRunRegion = false;
private HRegion region;
/**
@ -70,9 +71,15 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
if (e instanceof RegionCoprocessorEnvironment) {
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
this.region = env.getRegion();
isFlowRunRegion = TimelineStorageUtils.isFlowRunTable(
region.getRegionInfo(), env.getConfiguration());
}
}
public boolean isFlowRunRegion() {
return isFlowRunRegion;
}
/*
* (non-Javadoc)
*
@ -93,6 +100,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
WALEdit edit, Durability durability) throws IOException {
Map<String, byte[]> attributes = put.getAttributesMap();
if (!isFlowRunRegion) {
return;
}
// Assumption is that all the cells in a put are the same operation.
List<Tag> tags = new ArrayList<>();
if ((attributes != null) && (attributes.size() > 0)) {
@ -160,6 +170,10 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
Get get, List<Cell> results) throws IOException {
if (!isFlowRunRegion) {
return;
}
Scan scan = new Scan(get);
scan.setMaxVersions();
RegionScanner scanner = null;
@ -190,11 +204,14 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
@Override
public RegionScanner preScannerOpen(
ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
RegionScanner s) throws IOException {
// set max versions for scan to see all
// versions to aggregate for metrics
scan.setMaxVersions();
return s;
RegionScanner scanner) throws IOException {
if (isFlowRunRegion) {
// set max versions for scan to see all
// versions to aggregate for metrics
scan.setMaxVersions();
}
return scanner;
}
/*
@ -213,6 +230,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
public RegionScanner postScannerOpen(
ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
RegionScanner scanner) throws IOException {
if (!isFlowRunRegion) {
return scanner;
}
return new FlowScanner(e.getEnvironment(), scan.getBatch(),
scanner, FlowScannerOperation.READ);
}
@ -221,6 +241,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
public InternalScanner preFlush(
ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner) throws IOException {
if (!isFlowRunRegion) {
return scanner;
}
if (LOG.isDebugEnabled()) {
if (store != null) {
LOG.debug("preFlush store = " + store.getColumnFamilyName()
@ -241,6 +264,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, StoreFile resultFile) {
if (!isFlowRunRegion) {
return;
}
if (LOG.isDebugEnabled()) {
if (store != null) {
LOG.debug("postFlush store = " + store.getColumnFamilyName()
@ -262,6 +288,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
InternalScanner scanner, ScanType scanType, CompactionRequest request)
throws IOException {
if (!isFlowRunRegion) {
return scanner;
}
FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
if (request != null) {
requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION

View File

@ -210,7 +210,7 @@ class FlowScanner implements RegionScanner, Closeable {
if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
if (converter != null && isNumericConverter(converter)) {
addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
(NumericValueConverter)converter, currentTimestamp);
converter, currentTimestamp);
}
resetState(currentColumnCells, alreadySeenAggDim);
currentColumnQualifier = newColumnQualifier;
@ -219,6 +219,7 @@ class FlowScanner implements RegionScanner, Closeable {
}
// No operation needs to be performed on non numeric converters.
if (!isNumericConverter(converter)) {
currentColumnCells.add(cell);
nextCell(cellLimit);
continue;
}
@ -228,7 +229,7 @@ class FlowScanner implements RegionScanner, Closeable {
}
if (!currentColumnCells.isEmpty()) {
addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
(NumericValueConverter)converter, currentTimestamp);
converter, currentTimestamp);
if (LOG.isDebugEnabled()) {
if (addedCnt > 0) {
LOG.debug("emitted cells. " + addedCnt + " for " + this.action
@ -345,7 +346,7 @@ class FlowScanner implements RegionScanner, Closeable {
* parameter.
*/
private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
AggregationOperation currentAggOp, NumericValueConverter converter,
AggregationOperation currentAggOp, ValueConverter converter,
long currentTimestamp) throws IOException {
if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
return 0;
@ -372,12 +373,14 @@ class FlowScanner implements RegionScanner, Closeable {
cells.addAll(currentColumnCells);
return currentColumnCells.size();
case READ:
Cell sumCell = processSummation(currentColumnCells, converter);
Cell sumCell = processSummation(currentColumnCells,
(NumericValueConverter) converter);
cells.add(sumCell);
return 1;
case MAJOR_COMPACTION:
List<Cell> finalCells = processSummationMajorCompaction(
currentColumnCells, converter, currentTimestamp);
currentColumnCells, (NumericValueConverter) converter,
currentTimestamp);
cells.addAll(finalCells);
return finalCells.size();
default:

View File

@ -19,18 +19,21 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
@ -38,6 +41,8 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
@ -57,6 +62,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriter
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -84,6 +91,60 @@ public class TestHBaseStorageFlowRun {
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
}
@Test
public void checkCoProcessorOff() throws IOException, InterruptedException {
Configuration hbaseConf = util.getConfiguration();
TableName table = TableName.valueOf(hbaseConf.get(
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
Connection conn = null;
conn = ConnectionFactory.createConnection(hbaseConf);
Admin admin = conn.getAdmin();
if (admin == null) {
throw new IOException("Can't check tables since admin is null");
}
if (admin.tableExists(table)) {
// check the regions.
// check in flow run table
util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table);
List<HRegion> regions = server.getOnlineRegions(table);
for (HRegion region : regions) {
assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf));
}
}
table = TableName.valueOf(hbaseConf.get(
FlowActivityTable.TABLE_NAME_CONF_NAME,
FlowActivityTable.DEFAULT_TABLE_NAME));
if (admin.tableExists(table)) {
// check the regions.
// check in flow activity table
util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table);
List<HRegion> regions = server.getOnlineRegions(table);
for (HRegion region : regions) {
assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf));
}
}
table = TableName.valueOf(hbaseConf.get(
EntityTable.TABLE_NAME_CONF_NAME,
EntityTable.DEFAULT_TABLE_NAME));
if (admin.tableExists(table)) {
// check the regions.
// check in entity run table
util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table);
List<HRegion> regions = server.getOnlineRegions(table);
for (HRegion region : regions) {
assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf));
}
}
}
/**
* Writes 4 timeline entities belonging to one flow run through the
* {@link HBaseTimelineWriterImpl}

View File

@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@ -87,6 +89,40 @@ public class TestHBaseStorageFlowRunCompaction {
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
}
/** writes non numeric data into flow run table
* reads it back
*
* @throws Exception
*/
@Test
public void testWriteNonNumericData() throws Exception {
String rowKey = "nonNumericRowKey";
String column = "nonNumericColumnName";
String value = "nonNumericValue";
byte[] rowKeyBytes = Bytes.toBytes(rowKey);
byte[] columnNameBytes = Bytes.toBytes(column);
byte[] valueBytes = Bytes.toBytes(value);
Put p = new Put(rowKeyBytes);
p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes,
valueBytes);
Configuration hbaseConf = util.getConfiguration();
TableName table = TableName.valueOf(hbaseConf.get(
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
Connection conn = null;
conn = ConnectionFactory.createConnection(hbaseConf);
Table flowRunTable = conn.getTable(table);
flowRunTable.put(p);
Get g = new Get(rowKeyBytes);
Result r = flowRunTable.get(g);
assertNotNull(r);
assertTrue(r.size() >= 1);
Cell actualValue = r.getColumnLatestCell(
FlowRunColumnFamily.INFO.getBytes(), columnNameBytes);
assertNotNull(CellUtil.cloneValue(actualValue));
assertEquals(Bytes.toString(CellUtil.cloneValue(actualValue)), value);
}
@Test
public void testWriteFlowRunCompaction() throws Exception {
String cluster = "kompaction_cluster1";