parent
d1274c3b71
commit
10663b78c8
|
@ -236,7 +236,7 @@
|
|||
</moduleSet>
|
||||
<moduleSet>
|
||||
<includes>
|
||||
<include>org.apache.hadoop:${hbase-server-artifactid}</include>
|
||||
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server</include>
|
||||
</includes>
|
||||
<binaries>
|
||||
<outputDirectory>share/hadoop/${hadoop.component}/timelineservice</outputDirectory>
|
||||
|
|
|
@ -49,6 +49,9 @@
|
|||
<xerces.jdiff.version>2.11.0</xerces.jdiff.version>
|
||||
|
||||
<kafka.version>0.8.2.1</kafka.version>
|
||||
<hbase.version>1.2.6</hbase.version>
|
||||
<hbase-compatible-hadoop.version>2.5.1</hbase-compatible-hadoop.version>
|
||||
<hbase-compatible-guava.version>11.0.2</hbase-compatible-guava.version>
|
||||
|
||||
<hadoop.assemblies.version>${project.version}</hadoop.assemblies.version>
|
||||
<commons-daemon.version>1.0.13</commons-daemon.version>
|
||||
|
@ -404,6 +407,12 @@
|
|||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-applications-distributedshell</artifactId>
|
||||
|
@ -657,6 +666,7 @@
|
|||
<artifactId>jsp-api</artifactId>
|
||||
<version>2.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.glassfish</groupId>
|
||||
<artifactId>javax.servlet</artifactId>
|
||||
|
@ -1829,64 +1839,6 @@
|
|||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
<!-- The profile for building against HBase 1.2.x
|
||||
This is the default.
|
||||
-->
|
||||
<profile>
|
||||
<id>hbase1</id>
|
||||
<activation>
|
||||
<property>
|
||||
<name>!hbase.profile</name>
|
||||
</property>
|
||||
</activation>
|
||||
<properties>
|
||||
<hbase.version>1.2.6</hbase.version>
|
||||
<hbase-compatible-hadoop.version>2.5.1</hbase-compatible-hadoop.version>
|
||||
<hbase-compatible-guava.version>11.0.2</hbase-compatible-guava.version>
|
||||
<hbase-server-artifactid>hadoop-yarn-server-timelineservice-hbase-server-1</hbase-server-artifactid>
|
||||
</properties>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>${hbase-server-artifactid}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
</profile>
|
||||
<!-- The profile for building against HBase 2.0.0.
|
||||
Activate using: mvn -Dhbase.profile=2.0
|
||||
-->
|
||||
<profile>
|
||||
<id>hbase2</id>
|
||||
<activation>
|
||||
<property>
|
||||
<name>hbase.profile</name>
|
||||
<value>2.0</value>
|
||||
</property>
|
||||
</activation>
|
||||
<properties>
|
||||
<hbase.version>2.0.0-beta-1</hbase.version>
|
||||
<hbase-compatible-hadoop.version>3.0.0</hbase-compatible-hadoop.version>
|
||||
<hbase-compatible-guava.version>11.0.2</hbase-compatible-guava.version>
|
||||
<hbase-server-artifactid>hadoop-yarn-server-timelineservice-hbase-server-2</hbase-server-artifactid>
|
||||
</properties>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>${hbase-server-artifactid}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jruby.jcodings</groupId>
|
||||
<artifactId>jcodings</artifactId>
|
||||
<version>1.0.13</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
</profile>
|
||||
</profiles>
|
||||
|
||||
<repositories>
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>hadoop-yarn-server-timelineservice-hbase-tests</artifactId>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
<name>Apache Hadoop YARN TimelineService HBase tests</name>
|
||||
<name>Apache Hadoop YARN Timeline Service HBase tests</name>
|
||||
|
||||
<properties>
|
||||
<!-- Needed for generating FindBugs warnings using parent pom -->
|
||||
|
@ -82,6 +82,18 @@
|
|||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
|
@ -402,68 +414,4 @@
|
|||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>hbase1</id>
|
||||
<activation>
|
||||
<property>
|
||||
<name>!hbase.profile</name>
|
||||
</property>
|
||||
</activation>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-timelineservice-hbase-server-1</artifactId>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>hbase2</id>
|
||||
<activation>
|
||||
<property>
|
||||
<name>hbase.profile</name>
|
||||
<value>2.0</value>
|
||||
</property>
|
||||
</activation>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-timelineservice-hbase-server-2</artifactId>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<!-- 'mvn dependency:analyze' fails to detect use of this direct
|
||||
dependency -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs-client</artifactId>
|
||||
<version>${hbase-compatible-hadoop.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!-- 'mvn dependency:analyze' fails to detect use of this direct
|
||||
dependency -->
|
||||
<!-- This is needed by HBaseTestingUtility -->
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-all</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</profile>
|
||||
|
||||
</profiles>
|
||||
</project>
|
||||
|
|
|
@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
|
|||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
|
||||
|
@ -62,7 +64,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriter
|
|||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTableRW;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -88,7 +89,7 @@ public class TestHBaseStorageFlowRun {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void checkCoProcessorOff() throws Exception, InterruptedException {
|
||||
public void checkCoProcessorOff() throws IOException, InterruptedException {
|
||||
Configuration hbaseConf = util.getConfiguration();
|
||||
TableName table = BaseTableRW.getTableName(hbaseConf,
|
||||
FlowRunTableRW.TABLE_NAME_CONF_NAME, FlowRunTableRW.DEFAULT_TABLE_NAME);
|
||||
|
@ -126,9 +127,19 @@ public class TestHBaseStorageFlowRun {
|
|||
}
|
||||
|
||||
private void checkCoprocessorExists(TableName table, boolean exists)
|
||||
throws Exception {
|
||||
throws IOException, InterruptedException {
|
||||
HRegionServer server = util.getRSForFirstRegionInTable(table);
|
||||
HBaseTimelineServerUtils.validateFlowRunCoprocessor(server, table, exists);
|
||||
List<Region> regions = server.getOnlineRegions(table);
|
||||
for (Region region : regions) {
|
||||
boolean found = false;
|
||||
Set<String> coprocs = region.getCoprocessorHost().getCoprocessors();
|
||||
for (String coprocName : coprocs) {
|
||||
if (coprocName.contains("FlowRunCoprocessor")) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
assertEquals(found, exists);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
|
@ -45,7 +44,9 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
|
@ -326,15 +327,20 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
}
|
||||
|
||||
// check in flow run table
|
||||
TableName flowRunTable = BaseTableRW.getTableName(c1,
|
||||
FlowRunTableRW.TABLE_NAME_CONF_NAME, FlowRunTableRW.DEFAULT_TABLE_NAME);
|
||||
HRegionServer server = util.getRSForFirstRegionInTable(flowRunTable);
|
||||
|
||||
// flush and compact all the regions of the primary table
|
||||
int regionNum = HBaseTimelineServerUtils.flushCompactTableRegions(
|
||||
server, flowRunTable);
|
||||
HRegionServer server = util.getRSForFirstRegionInTable(
|
||||
BaseTableRW.getTableName(c1, FlowRunTableRW.TABLE_NAME_CONF_NAME,
|
||||
FlowRunTableRW.DEFAULT_TABLE_NAME));
|
||||
List<Region> regions = server.getOnlineRegions(
|
||||
BaseTableRW.getTableName(c1,
|
||||
FlowRunTableRW.TABLE_NAME_CONF_NAME,
|
||||
FlowRunTableRW.DEFAULT_TABLE_NAME));
|
||||
assertTrue("Didn't find any regions for primary table!",
|
||||
regionNum > 0);
|
||||
regions.size() > 0);
|
||||
// flush and compact all the regions of the primary table
|
||||
for (Region region : regions) {
|
||||
region.flush(true);
|
||||
region.compact(true);
|
||||
}
|
||||
|
||||
// check flow run for one flow many apps
|
||||
checkFlowRunTable(cluster, user, flow, runid, c1, 4);
|
||||
|
@ -386,10 +392,13 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
private FlowScanner getFlowScannerForTestingCompaction() {
|
||||
// create a FlowScanner object with the sole purpose of invoking a process
|
||||
// summation;
|
||||
CompactionRequest request = new CompactionRequest();
|
||||
request.setIsMajor(true, true);
|
||||
// okay to pass in nulls for the constructor arguments
|
||||
// because all we want to do is invoke the process summation
|
||||
FlowScanner fs = new FlowScanner(null, null,
|
||||
FlowScannerOperation.MAJOR_COMPACTION);
|
||||
(request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
|
||||
: FlowScannerOperation.MINOR_COMPACTION));
|
||||
assertNotNull(fs);
|
||||
return fs;
|
||||
}
|
||||
|
@ -414,45 +423,40 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
|
||||
|
||||
List<Tag> tags = new ArrayList<>();
|
||||
Tag t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationOperation.SUM_FINAL.getTagType(),
|
||||
Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
|
||||
"application_1234588888_91188");
|
||||
tags.add(t);
|
||||
byte[] tagByteArray =
|
||||
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
byte[] tagByteArray = Tag.fromList(tags);
|
||||
// create a cell with a VERY old timestamp and attribute SUM_FINAL
|
||||
Cell c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily,
|
||||
aQualifier, cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
|
||||
currentColumnCells.add(c1);
|
||||
|
||||
tags = new ArrayList<>();
|
||||
t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationOperation.SUM_FINAL.getTagType(),
|
||||
t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
|
||||
"application_12700000001_29102");
|
||||
tags.add(t);
|
||||
tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
tagByteArray = Tag.fromList(tags);
|
||||
// create a cell with a recent timestamp and attribute SUM_FINAL
|
||||
Cell c2 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily,
|
||||
aQualifier, cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
|
||||
currentColumnCells.add(c2);
|
||||
|
||||
tags = new ArrayList<>();
|
||||
t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationOperation.SUM.getTagType(),
|
||||
t = new Tag(AggregationOperation.SUM.getTagType(),
|
||||
"application_191780000000001_8195");
|
||||
tags.add(t);
|
||||
tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
tagByteArray = Tag.fromList(tags);
|
||||
// create a cell with a VERY old timestamp but has attribute SUM
|
||||
Cell c3 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily,
|
||||
aQualifier, cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
|
||||
currentColumnCells.add(c3);
|
||||
|
||||
tags = new ArrayList<>();
|
||||
t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationOperation.SUM.getTagType(),
|
||||
t = new Tag(AggregationOperation.SUM.getTagType(),
|
||||
"application_191780000000001_98104");
|
||||
tags.add(t);
|
||||
tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
tagByteArray = Tag.fromList(tags);
|
||||
// create a cell with a VERY old timestamp but has attribute SUM
|
||||
Cell c4 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily,
|
||||
aQualifier, cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
|
||||
|
@ -519,12 +523,10 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
// insert SUM_FINAL cells
|
||||
for (int i = 0; i < count; i++) {
|
||||
tags = new ArrayList<>();
|
||||
t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationOperation.SUM_FINAL.getTagType(),
|
||||
t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
|
||||
"application_123450000" + i + "01_19" + i);
|
||||
tags.add(t);
|
||||
byte[] tagByteArray =
|
||||
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
byte[] tagByteArray = Tag.fromList(tags);
|
||||
// create a cell with a VERY old timestamp and attribute SUM_FINAL
|
||||
c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier,
|
||||
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
|
||||
|
@ -535,12 +537,10 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
// add SUM cells
|
||||
for (int i = 0; i < count; i++) {
|
||||
tags = new ArrayList<>();
|
||||
t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationOperation.SUM.getTagType(),
|
||||
t = new Tag(AggregationOperation.SUM.getTagType(),
|
||||
"application_1987650000" + i + "83_911" + i);
|
||||
tags.add(t);
|
||||
byte[] tagByteArray =
|
||||
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
byte[] tagByteArray = Tag.fromList(tags);
|
||||
// create a cell with attribute SUM
|
||||
c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier,
|
||||
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
|
||||
|
@ -614,12 +614,10 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
// insert SUM_FINAL cells which will expire
|
||||
for (int i = 0; i < countFinal; i++) {
|
||||
tags = new ArrayList<>();
|
||||
t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationOperation.SUM_FINAL.getTagType(),
|
||||
t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
|
||||
"application_123450000" + i + "01_19" + i);
|
||||
tags.add(t);
|
||||
byte[] tagByteArray =
|
||||
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
byte[] tagByteArray = Tag.fromList(tags);
|
||||
// create a cell with a VERY old timestamp and attribute SUM_FINAL
|
||||
c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier,
|
||||
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
|
||||
|
@ -630,12 +628,10 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
// insert SUM_FINAL cells which will NOT expire
|
||||
for (int i = 0; i < countFinalNotExpire; i++) {
|
||||
tags = new ArrayList<>();
|
||||
t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationOperation.SUM_FINAL.getTagType(),
|
||||
t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
|
||||
"application_123450000" + i + "01_19" + i);
|
||||
tags.add(t);
|
||||
byte[] tagByteArray =
|
||||
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
byte[] tagByteArray = Tag.fromList(tags);
|
||||
// create a cell with a VERY old timestamp and attribute SUM_FINAL
|
||||
c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier,
|
||||
cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray);
|
||||
|
@ -646,12 +642,10 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
// add SUM cells
|
||||
for (int i = 0; i < countNotFinal; i++) {
|
||||
tags = new ArrayList<>();
|
||||
t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationOperation.SUM.getTagType(),
|
||||
t = new Tag(AggregationOperation.SUM.getTagType(),
|
||||
"application_1987650000" + i + "83_911" + i);
|
||||
tags.add(t);
|
||||
byte[] tagByteArray =
|
||||
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
byte[] tagByteArray = Tag.fromList(tags);
|
||||
// create a cell with attribute SUM
|
||||
c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier,
|
||||
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
|
||||
|
@ -703,12 +697,10 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
long cellValue2 = 28L;
|
||||
|
||||
List<Tag> tags = new ArrayList<>();
|
||||
Tag t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationOperation.SUM_FINAL.getTagType(),
|
||||
Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
|
||||
"application_1234588888_999888");
|
||||
tags.add(t);
|
||||
byte[] tagByteArray =
|
||||
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
byte[] tagByteArray = Tag.fromList(tags);
|
||||
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
|
||||
|
||||
// create a cell with a VERY old timestamp and attribute SUM_FINAL
|
||||
|
@ -717,11 +709,10 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
currentColumnCells.add(c1);
|
||||
|
||||
tags = new ArrayList<>();
|
||||
t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationOperation.SUM.getTagType(),
|
||||
t = new Tag(AggregationOperation.SUM.getTagType(),
|
||||
"application_100000000001_119101");
|
||||
tags.add(t);
|
||||
tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
tagByteArray = Tag.fromList(tags);
|
||||
|
||||
// create a cell with a VERY old timestamp but has attribute SUM
|
||||
Cell c2 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily,
|
||||
|
@ -764,12 +755,10 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
// note down the current timestamp
|
||||
long currentTimestamp = System.currentTimeMillis();
|
||||
List<Tag> tags = new ArrayList<>();
|
||||
Tag t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationOperation.SUM_FINAL.getTagType(),
|
||||
Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
|
||||
"application_123458888888_999888");
|
||||
tags.add(t);
|
||||
byte[] tagByteArray =
|
||||
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
byte[] tagByteArray = Tag.fromList(tags);
|
||||
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
|
||||
|
||||
// create a cell with a VERY old timestamp
|
||||
|
@ -804,12 +793,10 @@ public class TestHBaseStorageFlowRunCompaction {
|
|||
|
||||
// try for 1 cell with tag SUM
|
||||
List<Tag> tags = new ArrayList<>();
|
||||
Tag t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationOperation.SUM.getTagType(),
|
||||
Tag t = new Tag(AggregationOperation.SUM.getTagType(),
|
||||
"application_123458888888_999888");
|
||||
tags.add(t);
|
||||
byte[] tagByteArray =
|
||||
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
byte[] tagByteArray = Tag.fromList(tags);
|
||||
|
||||
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
|
||||
|
||||
|
|
|
@ -40,8 +40,8 @@
|
|||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<groupId>commons-logging</groupId>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
@ -93,7 +93,7 @@
|
|||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-common</artifactId>
|
||||
<artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
|
|
|
@ -1,186 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
|
||||
http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>hadoop-yarn-server-timelineservice-hbase-server-1</artifactId>
|
||||
<name>Apache Hadoop YARN TimelineService HBase Server 1.2</name>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
|
||||
<properties>
|
||||
<!-- Needed for generating FindBugs warnings using parent pom -->
|
||||
<yarn.basedir>${project.parent.parent.parent.parent.basedir}</yarn.basedir>
|
||||
</properties>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>default</id>
|
||||
<activation>
|
||||
<activeByDefault>true</activeByDefault>
|
||||
</activation>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>default-compile</id>
|
||||
<phase>none</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>hbase1</id>
|
||||
<activation>
|
||||
<property>
|
||||
<name>!hbase.profile</name>
|
||||
</property>
|
||||
</activation>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-timelineservice-hbase-common</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-annotations</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-common</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jetty-util</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-client</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-server</artifactId>
|
||||
<scope>provided</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs-client</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jetty</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jetty-util</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jetty-sslengine</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<configuration>
|
||||
<descriptor>src/assembly/coprocessor.xml</descriptor>
|
||||
<attach>true</attach>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>create-coprocessor-jar</id>
|
||||
<phase>prepare-package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
|
@ -1,38 +0,0 @@
|
|||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.01
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3
|
||||
http://maven.apache.org/xsd/assembly-1.1.3.xsd">
|
||||
<id>coprocessor</id>
|
||||
<formats>
|
||||
<format>jar</format>
|
||||
</formats>
|
||||
<includeBaseDirectory>false</includeBaseDirectory>
|
||||
<dependencySets>
|
||||
<dependencySet>
|
||||
<outputDirectory>/</outputDirectory>
|
||||
<useProjectArtifact>true</useProjectArtifact>
|
||||
<unpack>true</unpack>
|
||||
<scope>runtime</scope>
|
||||
<includes>
|
||||
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-common</include>
|
||||
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server-1</include>
|
||||
</includes>
|
||||
</dependencySet>
|
||||
</dependencySets>
|
||||
</assembly>
|
|
@ -1,194 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
|
||||
http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>hadoop-yarn-server-timelineservice-hbase-server-2</artifactId>
|
||||
<name>Apache Hadoop YARN TimelineService HBase Server 2.0</name>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
|
||||
<properties>
|
||||
<!-- Needed for generating FindBugs warnings using parent pom -->
|
||||
<yarn.basedir>${project.parent.parent.parent.parent.basedir}</yarn.basedir>
|
||||
</properties>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>default</id>
|
||||
<activation>
|
||||
<activeByDefault>true</activeByDefault>
|
||||
</activation>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>default-compile</id>
|
||||
<phase>none</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>hbase2</id>
|
||||
<activation>
|
||||
<property>
|
||||
<name>hbase.profile</name>
|
||||
<value>2.0</value>
|
||||
</property>
|
||||
</activation>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-timelineservice-hbase-common</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-annotations</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-common</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jetty-util</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-client</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<!-- This is to work around the version divergence of
|
||||
org.jruby.jcodings:jcodings pulled in by hbase-client -->
|
||||
<dependency>
|
||||
<groupId>org.jruby.jcodings</groupId>
|
||||
<artifactId>jcodings</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-server</artifactId>
|
||||
<scope>provided</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs-client</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jetty</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jetty-util</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jetty-sslengine</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>create-coprocessor-jar</id>
|
||||
<phase>prepare-package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<descriptor>src/assembly/coprocessor.xml</descriptor>
|
||||
<attach>true</attach>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
|
@ -1,224 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
|
||||
|
||||
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.TagUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* A utility class used by hbase-server module.
|
||||
*/
|
||||
public final class HBaseTimelineServerUtils {
|
||||
private HBaseTimelineServerUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a {@link Tag} from the input attribute.
|
||||
*
|
||||
* @param attribute Attribute from which tag has to be fetched.
|
||||
* @return a HBase Tag.
|
||||
*/
|
||||
public static Tag getTagFromAttribute(Map.Entry<String, byte[]> attribute) {
|
||||
// attribute could be either an Aggregation Operation or
|
||||
// an Aggregation Dimension
|
||||
// Get the Tag type from either
|
||||
AggregationOperation aggOp = AggregationOperation
|
||||
.getAggregationOperation(attribute.getKey());
|
||||
if (aggOp != null) {
|
||||
Tag t = createTag(aggOp.getTagType(), attribute.getValue());
|
||||
return t;
|
||||
}
|
||||
|
||||
AggregationCompactionDimension aggCompactDim =
|
||||
AggregationCompactionDimension.getAggregationCompactionDimension(
|
||||
attribute.getKey());
|
||||
if (aggCompactDim != null) {
|
||||
Tag t = createTag(aggCompactDim.getTagType(), attribute.getValue());
|
||||
return t;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* creates a new cell based on the input cell but with the new value.
|
||||
*
|
||||
* @param origCell Original cell
|
||||
* @param newValue new cell value
|
||||
* @return cell
|
||||
* @throws IOException while creating new cell.
|
||||
*/
|
||||
public static Cell createNewCell(Cell origCell, byte[] newValue)
|
||||
throws IOException {
|
||||
return CellUtil.createCell(CellUtil.cloneRow(origCell),
|
||||
CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
|
||||
origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
|
||||
}
|
||||
|
||||
/**
|
||||
* creates a cell with the given inputs.
|
||||
*
|
||||
* @param row row of the cell to be created
|
||||
* @param family column family name of the new cell
|
||||
* @param qualifier qualifier for the new cell
|
||||
* @param ts timestamp of the new cell
|
||||
* @param newValue value of the new cell
|
||||
* @param tags tags in the new cell
|
||||
* @return cell
|
||||
* @throws IOException while creating the cell.
|
||||
*/
|
||||
public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
|
||||
long ts, byte[] newValue, byte[] tags) throws IOException {
|
||||
return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
|
||||
newValue, tags);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a Tag.
|
||||
* @param tagType tag type
|
||||
* @param tag the content of the tag in byte array.
|
||||
* @return an instance of Tag
|
||||
*/
|
||||
public static Tag createTag(byte tagType, byte[] tag) {
|
||||
return new ArrayBackedTag(tagType, tag);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a Tag.
|
||||
* @param tagType tag type
|
||||
* @param tag the content of the tag in String.
|
||||
* @return an instance of Tag
|
||||
*/
|
||||
public static Tag createTag(byte tagType, String tag) {
|
||||
return createTag(tagType, Bytes.toBytes(tag));
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a cell to a list of tags.
|
||||
* @param cell the cell to convert
|
||||
* @return a list of tags
|
||||
*/
|
||||
public static List<Tag> convertCellAsTagList(Cell cell) {
|
||||
return TagUtil.asList(
|
||||
cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a list of tags to a byte array.
|
||||
* @param tags the list of tags to convert
|
||||
* @return byte array representation of the list of tags
|
||||
*/
|
||||
public static byte[] convertTagListToByteArray(List<Tag> tags) {
|
||||
return TagUtil.fromList(tags);
|
||||
}
|
||||
|
||||
/**
|
||||
* returns app id from the list of tags.
|
||||
*
|
||||
* @param tags cell tags to be looked into
|
||||
* @return App Id as the AggregationCompactionDimension
|
||||
*/
|
||||
public static String getAggregationCompactionDimension(List<Tag> tags) {
|
||||
String appId = null;
|
||||
for (Tag t : tags) {
|
||||
if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
|
||||
.getType()) {
|
||||
appId = Bytes.toString(Tag.cloneValue(t));
|
||||
return appId;
|
||||
}
|
||||
}
|
||||
return appId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the first seen aggregation operation as seen in the list of input
|
||||
* tags or null otherwise.
|
||||
*
|
||||
* @param tags list of HBase tags.
|
||||
* @return AggregationOperation
|
||||
*/
|
||||
public static AggregationOperation getAggregationOperationFromTagsList(
|
||||
List<Tag> tags) {
|
||||
for (AggregationOperation aggOp : AggregationOperation.values()) {
|
||||
for (Tag tag : tags) {
|
||||
if (tag.getType() == aggOp.getTagType()) {
|
||||
return aggOp;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// flush and compact all the regions of the primary table
|
||||
|
||||
/**
|
||||
* Flush and compact all regions of a table.
|
||||
* @param server region server
|
||||
* @param table the table to flush and compact
|
||||
* @return the number of regions flushed and compacted
|
||||
*/
|
||||
public static int flushCompactTableRegions(HRegionServer server,
|
||||
TableName table) throws IOException {
|
||||
List<HRegion> regions = server.getRegions(table);
|
||||
for (HRegion region : regions) {
|
||||
region.flush(true);
|
||||
region.compact(true);
|
||||
}
|
||||
return regions.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check the existence of FlowRunCoprocessor in a table.
|
||||
* @param server region server
|
||||
* @param table table to check
|
||||
* @param existenceExpected true if the FlowRunCoprocessor is expected
|
||||
* to be loaded in the table, false otherwise
|
||||
* @throws Exception
|
||||
*/
|
||||
public static void validateFlowRunCoprocessor(HRegionServer server,
|
||||
TableName table, boolean existenceExpected) throws Exception {
|
||||
List<HRegion> regions = server.getRegions(table);
|
||||
for (HRegion region : regions) {
|
||||
boolean found = false;
|
||||
Set<String> coprocs = region.getCoprocessorHost().getCoprocessors();
|
||||
for (String coprocName : coprocs) {
|
||||
if (coprocName.contains("FlowRunCoprocessor")) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
if (found != existenceExpected) {
|
||||
throw new Exception("FlowRunCoprocessor is" +
|
||||
(existenceExpected ? " not " : " ") + "loaded in table " + table);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,28 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Package org.apache.hadoop.yarn.server.timelineservice.storage.common contains
|
||||
* a set of utility classes used across backend storage reader and writer.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -1,285 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Optional;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Coprocessor for flow run table.
|
||||
*/
|
||||
public class FlowRunCoprocessor implements RegionCoprocessor, RegionObserver {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(FlowRunCoprocessor.class);
|
||||
|
||||
private Region region;
|
||||
/**
|
||||
* generate a timestamp that is unique per row in a region this is per region.
|
||||
*/
|
||||
private final TimestampGenerator timestampGenerator =
|
||||
new TimestampGenerator();
|
||||
|
||||
@Override
|
||||
public Optional<RegionObserver> getRegionObserver() {
|
||||
return Optional.of(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(CoprocessorEnvironment e) throws IOException {
|
||||
if (e instanceof RegionCoprocessorEnvironment) {
|
||||
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
|
||||
this.region = env.getRegion();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* This method adds the tags onto the cells in the Put. It is presumed that
|
||||
* all the cells in one Put have the same set of Tags. The existing cell
|
||||
* timestamp is overwritten for non-metric cells and each such cell gets a new
|
||||
* unique timestamp generated by {@link TimestampGenerator}
|
||||
*
|
||||
* @see
|
||||
* org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache
|
||||
* .hadoop.hbase.coprocessor.ObserverContext,
|
||||
* org.apache.hadoop.hbase.client.Put,
|
||||
* org.apache.hadoop.hbase.regionserver.wal.WALEdit,
|
||||
* org.apache.hadoop.hbase.client.Durability)
|
||||
*/
|
||||
@Override
|
||||
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,
|
||||
WALEdit edit, Durability durability) throws IOException {
|
||||
Map<String, byte[]> attributes = put.getAttributesMap();
|
||||
// Assumption is that all the cells in a put are the same operation.
|
||||
List<Tag> tags = new ArrayList<>();
|
||||
if ((attributes != null) && (attributes.size() > 0)) {
|
||||
for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
|
||||
Tag t = HBaseTimelineServerUtils.getTagFromAttribute(attribute);
|
||||
if (t != null) {
|
||||
tags.add(t);
|
||||
}
|
||||
}
|
||||
byte[] tagByteArray =
|
||||
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>(
|
||||
Bytes.BYTES_COMPARATOR);
|
||||
for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap()
|
||||
.entrySet()) {
|
||||
List<Cell> newCells = new ArrayList<>(entry.getValue().size());
|
||||
for (Cell cell : entry.getValue()) {
|
||||
// for each cell in the put add the tags
|
||||
// Assumption is that all the cells in
|
||||
// one put are the same operation
|
||||
// also, get a unique cell timestamp for non-metric cells
|
||||
// this way we don't inadvertently overwrite cell versions
|
||||
long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags);
|
||||
newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell),
|
||||
CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
|
||||
cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell),
|
||||
tagByteArray));
|
||||
}
|
||||
newFamilyMap.put(entry.getKey(), newCells);
|
||||
} // for each entry
|
||||
// Update the family map for the Put
|
||||
put.setFamilyCellMap(newFamilyMap);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if the current cell's timestamp is to be used or a new unique
|
||||
* cell timestamp is to be used. The reason this is done is to inadvertently
|
||||
* overwrite cells when writes come in very fast. But for metric cells, the
|
||||
* cell timestamp signifies the metric timestamp. Hence we don't want to
|
||||
* overwrite it.
|
||||
*
|
||||
* @param timestamp
|
||||
* @param tags
|
||||
* @return cell timestamp
|
||||
*/
|
||||
private long getCellTimestamp(long timestamp, List<Tag> tags) {
|
||||
// if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default)
|
||||
// then use the generator
|
||||
if (timestamp == HConstants.LATEST_TIMESTAMP) {
|
||||
return timestampGenerator.getUniqueTimestamp();
|
||||
} else {
|
||||
return timestamp;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* Creates a {@link FlowScanner} Scan so that it can correctly process the
|
||||
* contents of {@link FlowRunTable}.
|
||||
*
|
||||
* @see
|
||||
* org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache
|
||||
* .hadoop.hbase.coprocessor.ObserverContext,
|
||||
* org.apache.hadoop.hbase.client.Get, java.util.List)
|
||||
*/
|
||||
@Override
|
||||
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
Get get, List<Cell> results) throws IOException {
|
||||
Scan scan = new Scan(get);
|
||||
scan.setMaxVersions();
|
||||
RegionScanner scanner = null;
|
||||
try {
|
||||
scanner = new FlowScanner(e.getEnvironment(), scan,
|
||||
region.getScanner(scan), FlowScannerOperation.READ);
|
||||
scanner.next(results);
|
||||
e.bypass();
|
||||
} finally {
|
||||
if (scanner != null) {
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* Ensures that max versions are set for the Scan so that metrics can be
|
||||
* correctly aggregated and min/max can be correctly determined.
|
||||
*
|
||||
* @see
|
||||
* org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org
|
||||
* .apache.hadoop.hbase.coprocessor.ObserverContext,
|
||||
* org.apache.hadoop.hbase.client.Scan)
|
||||
*/
|
||||
@Override
|
||||
public void preScannerOpen(
|
||||
ObserverContext<RegionCoprocessorEnvironment> e, Scan scan)
|
||||
throws IOException {
|
||||
// set max versions for scan to see all
|
||||
// versions to aggregate for metrics
|
||||
scan.setMaxVersions();
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* Creates a {@link FlowScanner} Scan so that it can correctly process the
|
||||
* contents of {@link FlowRunTable}.
|
||||
*
|
||||
* @see
|
||||
* org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen(
|
||||
* org.apache.hadoop.hbase.coprocessor.ObserverContext,
|
||||
* org.apache.hadoop.hbase.client.Scan,
|
||||
* org.apache.hadoop.hbase.regionserver.RegionScanner)
|
||||
*/
|
||||
@Override
|
||||
public RegionScanner postScannerOpen(
|
||||
ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
|
||||
RegionScanner scanner) throws IOException {
|
||||
return new FlowScanner(e.getEnvironment(), scan,
|
||||
scanner, FlowScannerOperation.READ);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalScanner preFlush(
|
||||
ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
InternalScanner scanner, FlushLifeCycleTracker cycleTracker)
|
||||
throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (store != null) {
|
||||
LOG.debug("preFlush store = " + store.getColumnFamilyName()
|
||||
+ " flushableSize=" + store.getFlushableSize()
|
||||
+ " flushedCellsCount=" + store.getFlushedCellsCount()
|
||||
+ " compactedCellsCount=" + store.getCompactedCellsCount()
|
||||
+ " majorCompactedCellsCount="
|
||||
+ store.getMajorCompactedCellsCount() + " memstoreSize="
|
||||
+ store.getMemStoreSize() + " size=" + store.getSize()
|
||||
+ " storeFilesCount=" + store.getStorefilesCount());
|
||||
}
|
||||
}
|
||||
return new FlowScanner(c.getEnvironment(), scanner,
|
||||
FlowScannerOperation.FLUSH);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
Store store, StoreFile resultFile, FlushLifeCycleTracker tracker) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (store != null) {
|
||||
LOG.debug("postFlush store = " + store.getColumnFamilyName()
|
||||
+ " flushableSize=" + store.getFlushableSize()
|
||||
+ " flushedCellsCount=" + store.getFlushedCellsCount()
|
||||
+ " compactedCellsCount=" + store.getCompactedCellsCount()
|
||||
+ " majorCompactedCellsCount="
|
||||
+ store.getMajorCompactedCellsCount() + " memstoreSize="
|
||||
+ store.getMemStoreSize() + " size=" + store.getSize()
|
||||
+ " storeFilesCount=" + store.getStorefilesCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalScanner preCompact(
|
||||
ObserverContext<RegionCoprocessorEnvironment> e, Store store,
|
||||
InternalScanner scanner, ScanType scanType,
|
||||
CompactionLifeCycleTracker tracker, CompactionRequest request)
|
||||
throws IOException {
|
||||
|
||||
FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
|
||||
if (request != null) {
|
||||
requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
|
||||
: FlowScannerOperation.MINOR_COMPACTION);
|
||||
LOG.info("Compactionrequest= " + request.toString() + " "
|
||||
+ requestOp.toString() + " RegionName=" + e.getEnvironment()
|
||||
.getRegion().getRegionInfo().getRegionNameAsString());
|
||||
}
|
||||
return new FlowScanner(e.getEnvironment(), scanner, requestOp);
|
||||
}
|
||||
}
|
|
@ -1,723 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.ScannerContext;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Invoked via the coprocessor when a Get or a Scan is issued for flow run
|
||||
* table. Looks through the list of cells per row, checks their tags and does
|
||||
* operation on those cells as per the cell tags. Transforms reads of the stored
|
||||
* metrics into calculated sums for each column Also, finds the min and max for
|
||||
* start and end times in a flow run.
|
||||
*/
|
||||
class FlowScanner implements RegionScanner, Closeable {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(FlowScanner.class);
|
||||
|
||||
/**
|
||||
* use a special application id to represent the flow id this is needed since
|
||||
* TimestampGenerator parses the app id to generate a cell timestamp.
|
||||
*/
|
||||
private static final String FLOW_APP_ID = "application_00000000000_0000";
|
||||
|
||||
private final Region region;
|
||||
private final InternalScanner flowRunScanner;
|
||||
private final int batchSize;
|
||||
private final long appFinalValueRetentionThreshold;
|
||||
private RegionScanner regionScanner;
|
||||
private boolean hasMore;
|
||||
private byte[] currentRow;
|
||||
private List<Cell> availableCells = new ArrayList<>();
|
||||
private int currentIndex;
|
||||
private FlowScannerOperation action = FlowScannerOperation.READ;
|
||||
|
||||
FlowScanner(RegionCoprocessorEnvironment env, InternalScanner internalScanner,
|
||||
FlowScannerOperation action) {
|
||||
this(env, null, internalScanner, action);
|
||||
}
|
||||
|
||||
FlowScanner(RegionCoprocessorEnvironment env, Scan incomingScan,
|
||||
InternalScanner internalScanner, FlowScannerOperation action) {
|
||||
this.batchSize = incomingScan == null ? -1 : incomingScan.getBatch();
|
||||
// TODO initialize other scan attributes like Scan#maxResultSize
|
||||
this.flowRunScanner = internalScanner;
|
||||
if (internalScanner instanceof RegionScanner) {
|
||||
this.regionScanner = (RegionScanner) internalScanner;
|
||||
}
|
||||
this.action = action;
|
||||
if (env == null) {
|
||||
this.appFinalValueRetentionThreshold =
|
||||
YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD;
|
||||
this.region = null;
|
||||
} else {
|
||||
this.region = env.getRegion();
|
||||
Configuration hbaseConf = env.getConfiguration();
|
||||
this.appFinalValueRetentionThreshold = hbaseConf.getLong(
|
||||
YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD,
|
||||
YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(" batch size=" + batchSize);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo()
|
||||
*/
|
||||
@Override
|
||||
public HRegionInfo getRegionInfo() {
|
||||
return new HRegionInfo(region.getRegionInfo());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextRaw(List<Cell> cells) throws IOException {
|
||||
return nextRaw(cells, ScannerContext.newBuilder().build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextRaw(List<Cell> cells, ScannerContext scannerContext)
|
||||
throws IOException {
|
||||
return nextInternal(cells, scannerContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<Cell> cells) throws IOException {
|
||||
return next(cells, ScannerContext.newBuilder().build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<Cell> cells, ScannerContext scannerContext)
|
||||
throws IOException {
|
||||
return nextInternal(cells, scannerContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get value converter associated with a column or a column prefix. If nothing
|
||||
* matches, generic converter is returned.
|
||||
* @param colQualifierBytes
|
||||
* @return value converter implementation.
|
||||
*/
|
||||
private static ValueConverter getValueConverter(byte[] colQualifierBytes) {
|
||||
// Iterate over all the column prefixes for flow run table and get the
|
||||
// appropriate converter for the column qualifier passed if prefix matches.
|
||||
for (FlowRunColumnPrefix colPrefix : FlowRunColumnPrefix.values()) {
|
||||
byte[] colPrefixBytes = colPrefix.getColumnPrefixBytes("");
|
||||
if (Bytes.compareTo(colPrefixBytes, 0, colPrefixBytes.length,
|
||||
colQualifierBytes, 0, colPrefixBytes.length) == 0) {
|
||||
return colPrefix.getValueConverter();
|
||||
}
|
||||
}
|
||||
// Iterate over all the columns for flow run table and get the
|
||||
// appropriate converter for the column qualifier passed if match occurs.
|
||||
for (FlowRunColumn column : FlowRunColumn.values()) {
|
||||
if (Bytes.compareTo(
|
||||
column.getColumnQualifierBytes(), colQualifierBytes) == 0) {
|
||||
return column.getValueConverter();
|
||||
}
|
||||
}
|
||||
// Return generic converter if nothing matches.
|
||||
return GenericConverter.getInstance();
|
||||
}
|
||||
|
||||
/**
|
||||
* This method loops through the cells in a given row of the
|
||||
* {@link FlowRunTable}. It looks at the tags of each cell to figure out how
|
||||
* to process the contents. It then calculates the sum or min or max for each
|
||||
* column or returns the cell as is.
|
||||
*
|
||||
* @param cells
|
||||
* @param scannerContext
|
||||
* @return true if next row is available for the scanner, false otherwise
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean nextInternal(List<Cell> cells, ScannerContext scannerContext)
|
||||
throws IOException {
|
||||
Cell cell = null;
|
||||
startNext();
|
||||
// Loop through all the cells in this row
|
||||
// For min/max/metrics we do need to scan the entire set of cells to get the
|
||||
// right one
|
||||
// But with flush/compaction, the number of cells being scanned will go down
|
||||
// cells are grouped per column qualifier then sorted by cell timestamp
|
||||
// (latest to oldest) per column qualifier
|
||||
// So all cells in one qualifier come one after the other before we see the
|
||||
// next column qualifier
|
||||
ByteArrayComparator comp = new ByteArrayComparator();
|
||||
byte[] previousColumnQualifier = Separator.EMPTY_BYTES;
|
||||
AggregationOperation currentAggOp = null;
|
||||
SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
|
||||
Set<String> alreadySeenAggDim = new HashSet<>();
|
||||
int addedCnt = 0;
|
||||
long currentTimestamp = System.currentTimeMillis();
|
||||
ValueConverter converter = null;
|
||||
int limit = batchSize;
|
||||
|
||||
while (limit <= 0 || addedCnt < limit) {
|
||||
cell = peekAtNextCell(scannerContext);
|
||||
if (cell == null) {
|
||||
break;
|
||||
}
|
||||
byte[] currentColumnQualifier = CellUtil.cloneQualifier(cell);
|
||||
if (previousColumnQualifier == null) {
|
||||
// first time in loop
|
||||
previousColumnQualifier = currentColumnQualifier;
|
||||
}
|
||||
|
||||
converter = getValueConverter(currentColumnQualifier);
|
||||
if (comp.compare(previousColumnQualifier, currentColumnQualifier) != 0) {
|
||||
addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
|
||||
converter, currentTimestamp);
|
||||
resetState(currentColumnCells, alreadySeenAggDim);
|
||||
previousColumnQualifier = currentColumnQualifier;
|
||||
currentAggOp = getCurrentAggOp(cell);
|
||||
converter = getValueConverter(currentColumnQualifier);
|
||||
}
|
||||
collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim,
|
||||
converter, scannerContext);
|
||||
nextCell(scannerContext);
|
||||
}
|
||||
if ((!currentColumnCells.isEmpty()) && ((limit <= 0 || addedCnt < limit))) {
|
||||
addedCnt += emitCells(cells, currentColumnCells, currentAggOp, converter,
|
||||
currentTimestamp);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (addedCnt > 0) {
|
||||
LOG.debug("emitted cells. " + addedCnt + " for " + this.action
|
||||
+ " rowKey="
|
||||
+ FlowRunRowKey.parseRowKey(CellUtil.cloneRow(cells.get(0))));
|
||||
} else {
|
||||
LOG.debug("emitted no cells for " + this.action);
|
||||
}
|
||||
}
|
||||
}
|
||||
return hasMore();
|
||||
}
|
||||
|
||||
private AggregationOperation getCurrentAggOp(Cell cell) {
|
||||
List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
|
||||
// We assume that all the operations for a particular column are the same
|
||||
return HBaseTimelineServerUtils.getAggregationOperationFromTagsList(tags);
|
||||
}
|
||||
|
||||
/**
|
||||
* resets the parameters to an initialized state for next loop iteration.
|
||||
*/
|
||||
private void resetState(SortedSet<Cell> currentColumnCells,
|
||||
Set<String> alreadySeenAggDim) {
|
||||
currentColumnCells.clear();
|
||||
alreadySeenAggDim.clear();
|
||||
}
|
||||
|
||||
private void collectCells(SortedSet<Cell> currentColumnCells,
|
||||
AggregationOperation currentAggOp, Cell cell,
|
||||
Set<String> alreadySeenAggDim, ValueConverter converter,
|
||||
ScannerContext scannerContext) throws IOException {
|
||||
|
||||
if (currentAggOp == null) {
|
||||
// not a min/max/metric cell, so just return it as is
|
||||
currentColumnCells.add(cell);
|
||||
return;
|
||||
}
|
||||
|
||||
switch (currentAggOp) {
|
||||
case GLOBAL_MIN:
|
||||
if (currentColumnCells.size() == 0) {
|
||||
currentColumnCells.add(cell);
|
||||
} else {
|
||||
Cell currentMinCell = currentColumnCells.first();
|
||||
Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp,
|
||||
(NumericValueConverter) converter);
|
||||
if (!currentMinCell.equals(newMinCell)) {
|
||||
currentColumnCells.remove(currentMinCell);
|
||||
currentColumnCells.add(newMinCell);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case GLOBAL_MAX:
|
||||
if (currentColumnCells.size() == 0) {
|
||||
currentColumnCells.add(cell);
|
||||
} else {
|
||||
Cell currentMaxCell = currentColumnCells.first();
|
||||
Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp,
|
||||
(NumericValueConverter) converter);
|
||||
if (!currentMaxCell.equals(newMaxCell)) {
|
||||
currentColumnCells.remove(currentMaxCell);
|
||||
currentColumnCells.add(newMaxCell);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case SUM:
|
||||
case SUM_FINAL:
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("In collect cells "
|
||||
+ " FlowSannerOperation="
|
||||
+ this.action
|
||||
+ " currentAggOp="
|
||||
+ currentAggOp
|
||||
+ " cell qualifier="
|
||||
+ Bytes.toString(CellUtil.cloneQualifier(cell))
|
||||
+ " cell value= "
|
||||
+ converter.decodeValue(CellUtil.cloneValue(cell))
|
||||
+ " timestamp=" + cell.getTimestamp());
|
||||
}
|
||||
|
||||
// only if this app has not been seen yet, add to current column cells
|
||||
List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
|
||||
String aggDim = HBaseTimelineServerUtils
|
||||
.getAggregationCompactionDimension(tags);
|
||||
if (!alreadySeenAggDim.contains(aggDim)) {
|
||||
// if this agg dimension has already been seen,
|
||||
// since they show up in sorted order
|
||||
// we drop the rest which are older
|
||||
// in other words, this cell is older than previously seen cells
|
||||
// for that agg dim
|
||||
// but when this agg dim is not seen,
|
||||
// consider this cell in our working set
|
||||
currentColumnCells.add(cell);
|
||||
alreadySeenAggDim.add(aggDim);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
} // end of switch case
|
||||
}
|
||||
|
||||
/*
|
||||
* Processes the cells in input param currentColumnCells and populates
|
||||
* List<Cell> cells as the output based on the input AggregationOperation
|
||||
* parameter.
|
||||
*/
|
||||
private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
|
||||
AggregationOperation currentAggOp, ValueConverter converter,
|
||||
long currentTimestamp) throws IOException {
|
||||
if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
|
||||
return 0;
|
||||
}
|
||||
if (currentAggOp == null) {
|
||||
cells.addAll(currentColumnCells);
|
||||
return currentColumnCells.size();
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("In emitCells " + this.action + " currentColumnCells size= "
|
||||
+ currentColumnCells.size() + " currentAggOp" + currentAggOp);
|
||||
}
|
||||
|
||||
switch (currentAggOp) {
|
||||
case GLOBAL_MIN:
|
||||
case GLOBAL_MAX:
|
||||
cells.addAll(currentColumnCells);
|
||||
return currentColumnCells.size();
|
||||
case SUM:
|
||||
case SUM_FINAL:
|
||||
switch (action) {
|
||||
case FLUSH:
|
||||
case MINOR_COMPACTION:
|
||||
cells.addAll(currentColumnCells);
|
||||
return currentColumnCells.size();
|
||||
case READ:
|
||||
Cell sumCell = processSummation(currentColumnCells,
|
||||
(NumericValueConverter) converter);
|
||||
cells.add(sumCell);
|
||||
return 1;
|
||||
case MAJOR_COMPACTION:
|
||||
List<Cell> finalCells = processSummationMajorCompaction(
|
||||
currentColumnCells, (NumericValueConverter) converter,
|
||||
currentTimestamp);
|
||||
cells.addAll(finalCells);
|
||||
return finalCells.size();
|
||||
default:
|
||||
cells.addAll(currentColumnCells);
|
||||
return currentColumnCells.size();
|
||||
}
|
||||
default:
|
||||
cells.addAll(currentColumnCells);
|
||||
return currentColumnCells.size();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns a cell whose value is the sum of all cell values in the input set.
|
||||
* The new cell created has the timestamp of the most recent metric cell. The
|
||||
* sum of a metric for a flow run is the summation at the point of the last
|
||||
* metric update in that flow till that time.
|
||||
*/
|
||||
private Cell processSummation(SortedSet<Cell> currentColumnCells,
|
||||
NumericValueConverter converter) throws IOException {
|
||||
Number sum = 0;
|
||||
Number currentValue = 0;
|
||||
long ts = 0L;
|
||||
long mostCurrentTimestamp = 0L;
|
||||
Cell mostRecentCell = null;
|
||||
for (Cell cell : currentColumnCells) {
|
||||
currentValue = (Number) converter.decodeValue(CellUtil.cloneValue(cell));
|
||||
ts = cell.getTimestamp();
|
||||
if (mostCurrentTimestamp < ts) {
|
||||
mostCurrentTimestamp = ts;
|
||||
mostRecentCell = cell;
|
||||
}
|
||||
sum = converter.add(sum, currentValue);
|
||||
}
|
||||
byte[] sumBytes = converter.encodeValue(sum);
|
||||
Cell sumCell =
|
||||
HBaseTimelineServerUtils.createNewCell(mostRecentCell, sumBytes);
|
||||
return sumCell;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns a list of cells that contains
|
||||
*
|
||||
* A) the latest cells for applications that haven't finished yet
|
||||
* B) summation
|
||||
* for the flow, based on applications that have completed and are older than
|
||||
* a certain time
|
||||
*
|
||||
* The new cell created has the timestamp of the most recent metric cell. The
|
||||
* sum of a metric for a flow run is the summation at the point of the last
|
||||
* metric update in that flow till that time.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
List<Cell> processSummationMajorCompaction(
|
||||
SortedSet<Cell> currentColumnCells, NumericValueConverter converter,
|
||||
long currentTimestamp)
|
||||
throws IOException {
|
||||
Number sum = 0;
|
||||
Number currentValue = 0;
|
||||
long ts = 0L;
|
||||
boolean summationDone = false;
|
||||
List<Cell> finalCells = new ArrayList<Cell>();
|
||||
if (currentColumnCells == null) {
|
||||
return finalCells;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("In processSummationMajorCompaction,"
|
||||
+ " will drop cells older than " + currentTimestamp
|
||||
+ " CurrentColumnCells size=" + currentColumnCells.size());
|
||||
}
|
||||
|
||||
for (Cell cell : currentColumnCells) {
|
||||
AggregationOperation cellAggOp = getCurrentAggOp(cell);
|
||||
// if this is the existing flow sum cell
|
||||
List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
|
||||
String appId = HBaseTimelineServerUtils
|
||||
.getAggregationCompactionDimension(tags);
|
||||
if (appId == FLOW_APP_ID) {
|
||||
sum = converter.add(sum, currentValue);
|
||||
summationDone = true;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("reading flow app id sum=" + sum);
|
||||
}
|
||||
} else {
|
||||
currentValue = (Number) converter.decodeValue(CellUtil
|
||||
.cloneValue(cell));
|
||||
// read the timestamp truncated by the generator
|
||||
ts = TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp());
|
||||
if ((cellAggOp == AggregationOperation.SUM_FINAL)
|
||||
&& ((ts + this.appFinalValueRetentionThreshold)
|
||||
< currentTimestamp)) {
|
||||
sum = converter.add(sum, currentValue);
|
||||
summationDone = true;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("MAJOR COMPACTION loop sum= " + sum
|
||||
+ " discarding now: " + " qualifier="
|
||||
+ Bytes.toString(CellUtil.cloneQualifier(cell)) + " value="
|
||||
+ converter.decodeValue(CellUtil.cloneValue(cell))
|
||||
+ " timestamp=" + cell.getTimestamp() + " " + this.action);
|
||||
}
|
||||
} else {
|
||||
// not a final value but it's the latest cell for this app
|
||||
// so include this cell in the list of cells to write back
|
||||
finalCells.add(cell);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (summationDone) {
|
||||
Cell anyCell = currentColumnCells.first();
|
||||
List<Tag> tags = new ArrayList<Tag>();
|
||||
Tag t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationOperation.SUM_FINAL.getTagType(),
|
||||
Bytes.toBytes(FLOW_APP_ID));
|
||||
tags.add(t);
|
||||
t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationCompactionDimension.APPLICATION_ID.getTagType(),
|
||||
Bytes.toBytes(FLOW_APP_ID));
|
||||
tags.add(t);
|
||||
byte[] tagByteArray =
|
||||
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
Cell sumCell = HBaseTimelineServerUtils.createNewCell(
|
||||
CellUtil.cloneRow(anyCell),
|
||||
CellUtil.cloneFamily(anyCell),
|
||||
CellUtil.cloneQualifier(anyCell),
|
||||
TimestampGenerator.getSupplementedTimestamp(
|
||||
System.currentTimeMillis(), FLOW_APP_ID),
|
||||
converter.encodeValue(sum), tagByteArray);
|
||||
finalCells.add(sumCell);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("MAJOR COMPACTION final sum= " + sum + " for "
|
||||
+ Bytes.toString(CellUtil.cloneQualifier(sumCell))
|
||||
+ " " + this.action);
|
||||
}
|
||||
LOG.info("After major compaction for qualifier="
|
||||
+ Bytes.toString(CellUtil.cloneQualifier(sumCell))
|
||||
+ " with currentColumnCells.size="
|
||||
+ currentColumnCells.size()
|
||||
+ " returning finalCells.size=" + finalCells.size()
|
||||
+ " with sum=" + sum.longValue()
|
||||
+ " with cell timestamp " + sumCell.getTimestamp());
|
||||
} else {
|
||||
String qualifier = "";
|
||||
LOG.info("After major compaction for qualifier=" + qualifier
|
||||
+ " with currentColumnCells.size="
|
||||
+ currentColumnCells.size()
|
||||
+ " returning finalCells.size=" + finalCells.size()
|
||||
+ " with zero sum="
|
||||
+ sum.longValue());
|
||||
}
|
||||
return finalCells;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines which cell is to be returned based on the values in each cell
|
||||
* and the comparison operation MIN or MAX.
|
||||
*
|
||||
* @param previouslyChosenCell
|
||||
* @param currentCell
|
||||
* @param currentAggOp
|
||||
* @return the cell which is the min (or max) cell
|
||||
* @throws IOException
|
||||
*/
|
||||
private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
|
||||
AggregationOperation currentAggOp, NumericValueConverter converter)
|
||||
throws IOException {
|
||||
if (previouslyChosenCell == null) {
|
||||
return currentCell;
|
||||
}
|
||||
try {
|
||||
Number previouslyChosenCellValue = (Number)converter.decodeValue(
|
||||
CellUtil.cloneValue(previouslyChosenCell));
|
||||
Number currentCellValue = (Number) converter.decodeValue(CellUtil
|
||||
.cloneValue(currentCell));
|
||||
switch (currentAggOp) {
|
||||
case GLOBAL_MIN:
|
||||
if (converter.compare(
|
||||
currentCellValue, previouslyChosenCellValue) < 0) {
|
||||
// new value is minimum, hence return this cell
|
||||
return currentCell;
|
||||
} else {
|
||||
// previously chosen value is miniumum, hence return previous min cell
|
||||
return previouslyChosenCell;
|
||||
}
|
||||
case GLOBAL_MAX:
|
||||
if (converter.compare(
|
||||
currentCellValue, previouslyChosenCellValue) > 0) {
|
||||
// new value is max, hence return this cell
|
||||
return currentCell;
|
||||
} else {
|
||||
// previously chosen value is max, hence return previous max cell
|
||||
return previouslyChosenCell;
|
||||
}
|
||||
default:
|
||||
return currentCell;
|
||||
}
|
||||
} catch (IllegalArgumentException iae) {
|
||||
LOG.error("caught iae during conversion to long ", iae);
|
||||
return currentCell;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (flowRunScanner != null) {
|
||||
flowRunScanner.close();
|
||||
} else {
|
||||
LOG.warn("scanner close called but scanner is null");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called to signal the start of the next() call by the scanner.
|
||||
*/
|
||||
public void startNext() {
|
||||
currentRow = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether or not the underlying scanner has more rows.
|
||||
*/
|
||||
public boolean hasMore() {
|
||||
return currentIndex < availableCells.size() ? true : hasMore;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the next available cell for the current row and advances the
|
||||
* pointer to the next cell. This method can be called multiple times in a row
|
||||
* to advance through all the available cells.
|
||||
*
|
||||
* @param scannerContext
|
||||
* context information for the batch of cells under consideration
|
||||
* @return the next available cell or null if no more cells are available for
|
||||
* the current row
|
||||
* @throws IOException
|
||||
*/
|
||||
public Cell nextCell(ScannerContext scannerContext) throws IOException {
|
||||
Cell cell = peekAtNextCell(scannerContext);
|
||||
if (cell != null) {
|
||||
currentIndex++;
|
||||
}
|
||||
return cell;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the next available cell for the current row, without advancing the
|
||||
* pointer. Calling this method multiple times in a row will continue to
|
||||
* return the same cell.
|
||||
*
|
||||
* @param scannerContext
|
||||
* context information for the batch of cells under consideration
|
||||
* @return the next available cell or null if no more cells are available for
|
||||
* the current row
|
||||
* @throws IOException if any problem is encountered while grabbing the next
|
||||
* cell.
|
||||
*/
|
||||
public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException {
|
||||
if (currentIndex >= availableCells.size()) {
|
||||
// done with current batch
|
||||
availableCells.clear();
|
||||
currentIndex = 0;
|
||||
hasMore = flowRunScanner.next(availableCells, scannerContext);
|
||||
}
|
||||
Cell cell = null;
|
||||
if (currentIndex < availableCells.size()) {
|
||||
cell = availableCells.get(currentIndex);
|
||||
if (currentRow == null) {
|
||||
currentRow = CellUtil.cloneRow(cell);
|
||||
} else if (!CellUtil.matchingRow(cell, currentRow)) {
|
||||
// moved on to the next row
|
||||
// don't use the current cell
|
||||
// also signal no more cells for this row
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return cell;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize()
|
||||
*/
|
||||
@Override
|
||||
public long getMaxResultSize() {
|
||||
if (regionScanner == null) {
|
||||
throw new IllegalStateException(
|
||||
"RegionScanner.isFilterDone() called when the flow "
|
||||
+ "scanner's scanner is not a RegionScanner");
|
||||
}
|
||||
return regionScanner.getMaxResultSize();
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint()
|
||||
*/
|
||||
@Override
|
||||
public long getMvccReadPoint() {
|
||||
if (regionScanner == null) {
|
||||
throw new IllegalStateException(
|
||||
"RegionScanner.isFilterDone() called when the flow "
|
||||
+ "scanner's internal scanner is not a RegionScanner");
|
||||
}
|
||||
return regionScanner.getMvccReadPoint();
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone()
|
||||
*/
|
||||
@Override
|
||||
public boolean isFilterDone() throws IOException {
|
||||
if (regionScanner == null) {
|
||||
throw new IllegalStateException(
|
||||
"RegionScanner.isFilterDone() called when the flow "
|
||||
+ "scanner's internal scanner is not a RegionScanner");
|
||||
}
|
||||
return regionScanner.isFilterDone();
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[])
|
||||
*/
|
||||
@Override
|
||||
public boolean reseek(byte[] bytes) throws IOException {
|
||||
if (regionScanner == null) {
|
||||
throw new IllegalStateException(
|
||||
"RegionScanner.reseek() called when the flow "
|
||||
+ "scanner's internal scanner is not a RegionScanner");
|
||||
}
|
||||
return regionScanner.reseek(bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBatch() {
|
||||
return batchSize;
|
||||
}
|
||||
}
|
|
@ -1,46 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
|
||||
|
||||
|
||||
/**
|
||||
* Identifies the scanner operation on the {@link FlowRunTable}.
|
||||
*/
|
||||
public enum FlowScannerOperation {
|
||||
|
||||
/**
|
||||
* If the scanner is opened for reading
|
||||
* during preGet or preScan.
|
||||
*/
|
||||
READ,
|
||||
|
||||
/**
|
||||
* If the scanner is opened during preFlush.
|
||||
*/
|
||||
FLUSH,
|
||||
|
||||
/**
|
||||
* If the scanner is opened during minor Compaction.
|
||||
*/
|
||||
MINOR_COMPACTION,
|
||||
|
||||
/**
|
||||
* If the scanner is opened during major Compaction.
|
||||
*/
|
||||
MAJOR_COMPACTION
|
||||
}
|
|
@ -1,29 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Package org.apache.hadoop.yarn.server.timelineservice.storage.flow
|
||||
* contains classes related to implementation for flow related tables, viz. flow
|
||||
* run table and flow activity table.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -1,28 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Package org.apache.hadoop.yarn.server.timelineservice.storage contains
|
||||
* classes which define and implement reading and writing to backend storage.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -24,36 +24,138 @@
|
|||
<groupId>org.apache.hadoop</groupId>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId>
|
||||
<name>Apache Hadoop YARN TimelineService HBase Server</name>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
<name>Apache Hadoop YARN TimelineService HBase Servers</name>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>hbase1</id>
|
||||
<activation>
|
||||
<property>
|
||||
<name>!hbase.profile</name>
|
||||
</property>
|
||||
</activation>
|
||||
<modules>
|
||||
<module>hadoop-yarn-server-timelineservice-hbase-server-1</module>
|
||||
</modules>
|
||||
</profile>
|
||||
<profile>
|
||||
<id>hbase2</id>
|
||||
<activation>
|
||||
<property>
|
||||
<name>hbase.profile</name>
|
||||
<value>2.0</value>
|
||||
</property>
|
||||
</activation>
|
||||
<modules>
|
||||
<module>hadoop-yarn-server-timelineservice-hbase-server-2</module>
|
||||
</modules>
|
||||
</profile>
|
||||
</profiles>
|
||||
<properties>
|
||||
<!-- Needed for generating FindBugs warnings using parent pom -->
|
||||
<yarn.basedir>${project.parent.parent.parent.basedir}</yarn.basedir>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>commons-logging</groupId>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-timelineservice-hbase-common</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-common</artifactId>
|
||||
<scope>provided</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jetty-util</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-client</artifactId>
|
||||
<scope>provided</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-server</artifactId>
|
||||
<scope>provided</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs-client</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jetty</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jetty-util</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jetty-sslengine</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<configuration>
|
||||
<descriptor>src/assembly/coprocessor.xml</descriptor>
|
||||
<attach>true</attach>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>create-coprocessor-jar</id>
|
||||
<phase>prepare-package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>test-jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
|
@ -16,8 +16,7 @@
|
|||
-->
|
||||
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3
|
||||
http://maven.apache.org/xsd/assembly-1.1.3.xsd">
|
||||
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
|
||||
<id>coprocessor</id>
|
||||
<formats>
|
||||
<format>jar</format>
|
||||
|
@ -31,7 +30,7 @@
|
|||
<scope>runtime</scope>
|
||||
<includes>
|
||||
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-common</include>
|
||||
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server-2</include>
|
||||
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server</include>
|
||||
</includes>
|
||||
</dependencySet>
|
||||
</dependencySets>
|
|
@ -20,10 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
|
||||
|
@ -31,7 +28,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOpe
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* A utility class used by hbase-server module.
|
||||
|
@ -53,7 +49,7 @@ public final class HBaseTimelineServerUtils {
|
|||
AggregationOperation aggOp = AggregationOperation
|
||||
.getAggregationOperation(attribute.getKey());
|
||||
if (aggOp != null) {
|
||||
Tag t = createTag(aggOp.getTagType(), attribute.getValue());
|
||||
Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
|
||||
return t;
|
||||
}
|
||||
|
||||
|
@ -61,7 +57,7 @@ public final class HBaseTimelineServerUtils {
|
|||
AggregationCompactionDimension.getAggregationCompactionDimension(
|
||||
attribute.getKey());
|
||||
if (aggCompactDim != null) {
|
||||
Tag t = createTag(aggCompactDim.getTagType(), attribute.getValue());
|
||||
Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
|
||||
return t;
|
||||
}
|
||||
return null;
|
||||
|
@ -100,45 +96,6 @@ public final class HBaseTimelineServerUtils {
|
|||
newValue, tags);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a Tag.
|
||||
* @param tagType tag type
|
||||
* @param tag the content of the tag in byte array.
|
||||
* @return an instance of Tag
|
||||
*/
|
||||
public static Tag createTag(byte tagType, byte[] tag) {
|
||||
return new Tag(tagType, tag);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a Tag.
|
||||
* @param tagType tag type
|
||||
* @param tag the content of the tag in String.
|
||||
* @return an instance of Tag
|
||||
*/
|
||||
public static Tag createTag(byte tagType, String tag) {
|
||||
return createTag(tagType, Bytes.toBytes(tag));
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a cell to a list of tags.
|
||||
* @param cell the cell to convert
|
||||
* @return a list of tags
|
||||
*/
|
||||
public static List<Tag> convertCellAsTagList(Cell cell) {
|
||||
return Tag.asList(
|
||||
cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a list of tags to a byte array.
|
||||
* @param tags the list of tags to convert
|
||||
* @return byte array representation of the list of tags
|
||||
*/
|
||||
public static byte[] convertTagListToByteArray(List<Tag> tags) {
|
||||
return Tag.fromList(tags);
|
||||
}
|
||||
|
||||
/**
|
||||
* returns app id from the list of tags.
|
||||
*
|
||||
|
@ -175,48 +132,4 @@ public final class HBaseTimelineServerUtils {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// flush and compact all the regions of the primary table
|
||||
|
||||
/**
|
||||
* Flush and compact all regions of a table.
|
||||
* @param server region server
|
||||
* @param table the table to flush and compact
|
||||
* @return the number of regions flushed and compacted
|
||||
*/
|
||||
public static int flushCompactTableRegions(HRegionServer server,
|
||||
TableName table) throws IOException {
|
||||
List<Region> regions = server.getOnlineRegions(table);
|
||||
for (Region region : regions) {
|
||||
region.flush(true);
|
||||
region.compact(true);
|
||||
}
|
||||
return regions.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check the existence of FlowRunCoprocessor in a table.
|
||||
* @param server region server
|
||||
* @param table table to check
|
||||
* @param existenceExpected true if the FlowRunCoprocessor is expected
|
||||
* to be loaded in the table, false otherwise
|
||||
* @throws Exception
|
||||
*/
|
||||
public static void validateFlowRunCoprocessor(HRegionServer server,
|
||||
TableName table, boolean existenceExpected) throws Exception {
|
||||
List<Region> regions = server.getOnlineRegions(table);
|
||||
for (Region region : regions) {
|
||||
boolean found = false;
|
||||
Set<String> coprocs = region.getCoprocessorHost().getCoprocessors();
|
||||
for (String coprocName : coprocs) {
|
||||
if (coprocName.contains("FlowRunCoprocessor")) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
if (found != existenceExpected) {
|
||||
throw new Exception("FlowRunCoprocessor is" +
|
||||
(existenceExpected ? " not " : " ") + "loaded in table " + table);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -102,8 +102,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
|
|||
tags.add(t);
|
||||
}
|
||||
}
|
||||
byte[] tagByteArray =
|
||||
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
byte[] tagByteArray = Tag.fromList(tags);
|
||||
NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>(
|
||||
Bytes.BYTES_COMPARATOR);
|
||||
for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap()
|
|
@ -247,7 +247,8 @@ class FlowScanner implements RegionScanner, Closeable {
|
|||
}
|
||||
|
||||
private AggregationOperation getCurrentAggOp(Cell cell) {
|
||||
List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
|
||||
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
|
||||
cell.getTagsLength());
|
||||
// We assume that all the operations for a particular column are the same
|
||||
return HBaseTimelineServerUtils.getAggregationOperationFromTagsList(tags);
|
||||
}
|
||||
|
@ -315,7 +316,8 @@ class FlowScanner implements RegionScanner, Closeable {
|
|||
}
|
||||
|
||||
// only if this app has not been seen yet, add to current column cells
|
||||
List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
|
||||
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
|
||||
cell.getTagsLength());
|
||||
String aggDim = HBaseTimelineServerUtils
|
||||
.getAggregationCompactionDimension(tags);
|
||||
if (!alreadySeenAggDim.contains(aggDim)) {
|
||||
|
@ -452,7 +454,8 @@ class FlowScanner implements RegionScanner, Closeable {
|
|||
for (Cell cell : currentColumnCells) {
|
||||
AggregationOperation cellAggOp = getCurrentAggOp(cell);
|
||||
// if this is the existing flow sum cell
|
||||
List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
|
||||
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
|
||||
cell.getTagsLength());
|
||||
String appId = HBaseTimelineServerUtils
|
||||
.getAggregationCompactionDimension(tags);
|
||||
if (appId == FLOW_APP_ID) {
|
||||
|
@ -488,16 +491,13 @@ class FlowScanner implements RegionScanner, Closeable {
|
|||
if (summationDone) {
|
||||
Cell anyCell = currentColumnCells.first();
|
||||
List<Tag> tags = new ArrayList<Tag>();
|
||||
Tag t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationOperation.SUM_FINAL.getTagType(),
|
||||
Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
|
||||
Bytes.toBytes(FLOW_APP_ID));
|
||||
tags.add(t);
|
||||
t = HBaseTimelineServerUtils.createTag(
|
||||
AggregationCompactionDimension.APPLICATION_ID.getTagType(),
|
||||
t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(),
|
||||
Bytes.toBytes(FLOW_APP_ID));
|
||||
tags.add(t);
|
||||
byte[] tagByteArray =
|
||||
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
|
||||
byte[] tagByteArray = Tag.fromList(tags);
|
||||
Cell sumCell = HBaseTimelineServerUtils.createNewCell(
|
||||
CellUtil.cloneRow(anyCell),
|
||||
CellUtil.cloneFamily(anyCell),
|
Loading…
Reference in New Issue