YARN-8003. Backport the code structure changes in YARN-7346 to branch-2. Contributed by Haibo Chen.

This commit is contained in:
Rohith Sharma K S 2018-03-07 12:41:36 +05:30
parent a5d9691b54
commit 207daabbcc
16 changed files with 334 additions and 212 deletions

View File

@ -222,7 +222,7 @@
</moduleSet>
<moduleSet>
<includes>
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server</include>
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server-1</include>
</includes>
<binaries>
<outputDirectory>share/hadoop/${hadoop.component}/timelineservice</outputDirectory>

View File

@ -52,6 +52,7 @@
<hbase.version>1.2.6</hbase.version>
<hbase-compatible-hadoop.version>2.5.1</hbase-compatible-hadoop.version>
<hbase-compatible-guava.version>12.0.1</hbase-compatible-guava.version>
<hadoop.assemblies.version>${project.version}</hadoop.assemblies.version>
@ -361,7 +362,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId>
<artifactId>hadoop-yarn-server-timelineservice-hbase-server-1</artifactId>
<version>${project.version}</version>
</dependency>
@ -610,7 +611,6 @@
<artifactId>jsp-api</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.servlet</artifactId>

View File

@ -28,7 +28,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice-hbase-tests</artifactId>
<version>2.10.0-SNAPSHOT</version>
<name>Apache Hadoop YARN Timeline Service HBase tests</name>
<name>Apache Hadoop YARN TimelineService HBase tests</name>
<properties>
<!-- Needed for generating FindBugs warnings using parent pom -->
@ -85,7 +85,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId>
<artifactId>hadoop-yarn-server-timelineservice-hbase-server-1</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>

View File

@ -25,7 +25,6 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -41,7 +40,6 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
@ -64,6 +62,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriter
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTableRW;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -89,7 +88,7 @@ public class TestHBaseStorageFlowRun {
}
@Test
public void checkCoProcessorOff() throws IOException, InterruptedException {
public void checkCoProcessorOff() throws Exception, InterruptedException {
Configuration hbaseConf = util.getConfiguration();
TableName table = BaseTableRW.getTableName(hbaseConf,
FlowRunTableRW.TABLE_NAME_CONF_NAME, FlowRunTableRW.DEFAULT_TABLE_NAME);
@ -127,19 +126,9 @@ public class TestHBaseStorageFlowRun {
}
private void checkCoprocessorExists(TableName table, boolean exists)
throws IOException, InterruptedException {
throws Exception {
HRegionServer server = util.getRSForFirstRegionInTable(table);
List<Region> regions = server.getOnlineRegions(table);
for (Region region : regions) {
boolean found = false;
Set<String> coprocs = region.getCoprocessorHost().getCoprocessors();
for (String coprocName : coprocs) {
if (coprocName.contains("FlowRunCoprocessor")) {
found = true;
}
}
assertEquals(found, exists);
}
HBaseTimelineServerUtils.validateFlowRunCoprocessor(server, table, exists);
}
/**

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -44,9 +45,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
@ -327,20 +326,15 @@ public class TestHBaseStorageFlowRunCompaction {
}
// check in flow run table
HRegionServer server = util.getRSForFirstRegionInTable(
BaseTableRW.getTableName(c1, FlowRunTableRW.TABLE_NAME_CONF_NAME,
FlowRunTableRW.DEFAULT_TABLE_NAME));
List<Region> regions = server.getOnlineRegions(
BaseTableRW.getTableName(c1,
FlowRunTableRW.TABLE_NAME_CONF_NAME,
FlowRunTableRW.DEFAULT_TABLE_NAME));
assertTrue("Didn't find any regions for primary table!",
regions.size() > 0);
TableName flowRunTable = BaseTableRW.getTableName(c1,
FlowRunTableRW.TABLE_NAME_CONF_NAME, FlowRunTableRW.DEFAULT_TABLE_NAME);
HRegionServer server = util.getRSForFirstRegionInTable(flowRunTable);
// flush and compact all the regions of the primary table
for (Region region : regions) {
region.flush(true);
region.compact(true);
}
int regionNum = HBaseTimelineServerUtils.flushCompactTableRegions(
server, flowRunTable);
assertTrue("Didn't find any regions for primary table!",
regionNum > 0);
// check flow run for one flow many apps
checkFlowRunTable(cluster, user, flow, runid, c1, 4);
@ -392,13 +386,10 @@ public class TestHBaseStorageFlowRunCompaction {
private FlowScanner getFlowScannerForTestingCompaction() {
// create a FlowScanner object with the sole purpose of invoking a process
// summation;
CompactionRequest request = new CompactionRequest();
request.setIsMajor(true, true);
// okay to pass in nulls for the constructor arguments
// because all we want to do is invoke the process summation
FlowScanner fs = new FlowScanner(null, null,
(request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
: FlowScannerOperation.MINOR_COMPACTION));
FlowScannerOperation.MAJOR_COMPACTION);
assertNotNull(fs);
return fs;
}
@ -423,40 +414,45 @@ public class TestHBaseStorageFlowRunCompaction {
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
List<Tag> tags = new ArrayList<>();
Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
Tag t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
"application_1234588888_91188");
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
Cell c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
currentColumnCells.add(c1);
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
"application_12700000001_29102");
tags.add(t);
tagByteArray = Tag.fromList(tags);
tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with a recent timestamp and attribute SUM_FINAL
Cell c2 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
currentColumnCells.add(c2);
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM.getTagType(),
"application_191780000000001_8195");
tags.add(t);
tagByteArray = Tag.fromList(tags);
tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c3 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
currentColumnCells.add(c3);
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM.getTagType(),
"application_191780000000001_98104");
tags.add(t);
tagByteArray = Tag.fromList(tags);
tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c4 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
@ -523,10 +519,12 @@ public class TestHBaseStorageFlowRunCompaction {
// insert SUM_FINAL cells
for (int i = 0; i < count; i++) {
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
@ -537,10 +535,12 @@ public class TestHBaseStorageFlowRunCompaction {
// add SUM cells
for (int i = 0; i < count; i++) {
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM.getTagType(),
"application_1987650000" + i + "83_911" + i);
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with attribute SUM
c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
@ -614,10 +614,12 @@ public class TestHBaseStorageFlowRunCompaction {
// insert SUM_FINAL cells which will expire
for (int i = 0; i < countFinal; i++) {
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
@ -628,10 +630,12 @@ public class TestHBaseStorageFlowRunCompaction {
// insert SUM_FINAL cells which will NOT expire
for (int i = 0; i < countFinalNotExpire; i++) {
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray);
@ -642,10 +646,12 @@ public class TestHBaseStorageFlowRunCompaction {
// add SUM cells
for (int i = 0; i < countNotFinal; i++) {
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM.getTagType(),
"application_1987650000" + i + "83_911" + i);
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with attribute SUM
c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
@ -697,10 +703,12 @@ public class TestHBaseStorageFlowRunCompaction {
long cellValue2 = 28L;
List<Tag> tags = new ArrayList<>();
Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
Tag t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
"application_1234588888_999888");
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
@ -709,10 +717,11 @@ public class TestHBaseStorageFlowRunCompaction {
currentColumnCells.add(c1);
tags = new ArrayList<>();
t = new Tag(AggregationOperation.SUM.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM.getTagType(),
"application_100000000001_119101");
tags.add(t);
tagByteArray = Tag.fromList(tags);
tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c2 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily,
@ -755,10 +764,12 @@ public class TestHBaseStorageFlowRunCompaction {
// note down the current timestamp
long currentTimestamp = System.currentTimeMillis();
List<Tag> tags = new ArrayList<>();
Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
Tag t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
"application_123458888888_999888");
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp
@ -793,10 +804,12 @@ public class TestHBaseStorageFlowRunCompaction {
// try for 1 cell with tag SUM
List<Tag> tags = new ArrayList<>();
Tag t = new Tag(AggregationOperation.SUM.getTagType(),
Tag t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM.getTagType(),
"application_123458888888_999888");
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);

View File

@ -40,8 +40,8 @@
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
@ -92,7 +92,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
<artifactId>hadoop-yarn-server-common</artifactId>
<scope>provided</scope>
</dependency>

View File

@ -0,0 +1,155 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId>
<groupId>org.apache.hadoop</groupId>
<version>2.10.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hadoop-yarn-server-timelineservice-hbase-server-1</artifactId>
<name>Apache Hadoop YARN TimelineService HBase Server 1.2</name>
<version>2.10.0-SNAPSHOT</version>
<properties>
<!-- Needed for generating FindBugs warnings using parent pom -->
<yarn.basedir>${project.parent.parent.parent.parent.basedir}</yarn.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice-hbase-common</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-sslengine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptor>src/assembly/coprocessor.xml</descriptor>
<attach>true</attach>
</configuration>
<executions>
<execution>
<id>create-coprocessor-jar</id>
<phase>prepare-package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -16,7 +16,8 @@
-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3
http://maven.apache.org/xsd/assembly-1.1.3.xsd">
<id>coprocessor</id>
<formats>
<format>jar</format>
@ -30,7 +31,7 @@
<scope>runtime</scope>
<includes>
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-common</include>
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server</include>
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server-1</include>
</includes>
</dependencySet>
</dependencySets>

View File

@ -20,7 +20,10 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
@ -28,6 +31,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOpe
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* A utility class used by hbase-server module.
@ -49,7 +53,7 @@ public final class HBaseTimelineServerUtils {
AggregationOperation aggOp = AggregationOperation
.getAggregationOperation(attribute.getKey());
if (aggOp != null) {
Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
Tag t = createTag(aggOp.getTagType(), attribute.getValue());
return t;
}
@ -57,7 +61,7 @@ public final class HBaseTimelineServerUtils {
AggregationCompactionDimension.getAggregationCompactionDimension(
attribute.getKey());
if (aggCompactDim != null) {
Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
Tag t = createTag(aggCompactDim.getTagType(), attribute.getValue());
return t;
}
return null;
@ -96,6 +100,45 @@ public final class HBaseTimelineServerUtils {
newValue, tags);
}
/**
* Create a Tag.
* @param tagType tag type
* @param tag the content of the tag in byte array.
* @return an instance of Tag
*/
public static Tag createTag(byte tagType, byte[] tag) {
return new Tag(tagType, tag);
}
/**
* Create a Tag.
* @param tagType tag type
* @param tag the content of the tag in String.
* @return an instance of Tag
*/
public static Tag createTag(byte tagType, String tag) {
return createTag(tagType, Bytes.toBytes(tag));
}
/**
* Convert a cell to a list of tags.
* @param cell the cell to convert
* @return a list of tags
*/
public static List<Tag> convertCellAsTagList(Cell cell) {
return Tag.asList(
cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
}
/**
* Convert a list of tags to a byte array.
* @param tags the list of tags to convert
* @return byte array representation of the list of tags
*/
public static byte[] convertTagListToByteArray(List<Tag> tags) {
return Tag.fromList(tags);
}
/**
* returns app id from the list of tags.
*
@ -132,4 +175,48 @@ public final class HBaseTimelineServerUtils {
}
return null;
}
// flush and compact all the regions of the primary table
/**
* Flush and compact all regions of a table.
* @param server region server
* @param table the table to flush and compact
* @return the number of regions flushed and compacted
*/
public static int flushCompactTableRegions(HRegionServer server,
TableName table) throws IOException {
List<Region> regions = server.getOnlineRegions(table);
for (Region region : regions) {
region.flush(true);
region.compact(true);
}
return regions.size();
}
/**
* Check the existence of FlowRunCoprocessor in a table.
* @param server region server
* @param table table to check
* @param existenceExpected true if the FlowRunCoprocessor is expected
* to be loaded in the table, false otherwise
* @throws Exception
*/
public static void validateFlowRunCoprocessor(HRegionServer server,
TableName table, boolean existenceExpected) throws Exception {
List<Region> regions = server.getOnlineRegions(table);
for (Region region : regions) {
boolean found = false;
Set<String> coprocs = region.getCoprocessorHost().getCoprocessors();
for (String coprocName : coprocs) {
if (coprocName.contains("FlowRunCoprocessor")) {
found = true;
}
}
if (found != existenceExpected) {
throw new Exception("FlowRunCoprocessor is" +
(existenceExpected ? " not " : " ") + "loaded in table " + table);
}
}
}
}

View File

@ -102,7 +102,8 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
tags.add(t);
}
}
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>(
Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap()

View File

@ -247,8 +247,7 @@ class FlowScanner implements RegionScanner, Closeable {
}
private AggregationOperation getCurrentAggOp(Cell cell) {
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
// We assume that all the operations for a particular column are the same
return HBaseTimelineServerUtils.getAggregationOperationFromTagsList(tags);
}
@ -316,8 +315,7 @@ class FlowScanner implements RegionScanner, Closeable {
}
// only if this app has not been seen yet, add to current column cells
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
String aggDim = HBaseTimelineServerUtils
.getAggregationCompactionDimension(tags);
if (!alreadySeenAggDim.contains(aggDim)) {
@ -454,8 +452,7 @@ class FlowScanner implements RegionScanner, Closeable {
for (Cell cell : currentColumnCells) {
AggregationOperation cellAggOp = getCurrentAggOp(cell);
// if this is the existing flow sum cell
List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell);
String appId = HBaseTimelineServerUtils
.getAggregationCompactionDimension(tags);
if (appId == FLOW_APP_ID) {
@ -491,13 +488,16 @@ class FlowScanner implements RegionScanner, Closeable {
if (summationDone) {
Cell anyCell = currentColumnCells.first();
List<Tag> tags = new ArrayList<Tag>();
Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
Tag t = HBaseTimelineServerUtils.createTag(
AggregationOperation.SUM_FINAL.getTagType(),
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(),
t = HBaseTimelineServerUtils.createTag(
AggregationCompactionDimension.APPLICATION_ID.getTagType(),
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
byte[] tagByteArray = Tag.fromList(tags);
byte[] tagByteArray =
HBaseTimelineServerUtils.convertTagListToByteArray(tags);
Cell sumCell = HBaseTimelineServerUtils.createNewCell(
CellUtil.cloneRow(anyCell),
CellUtil.cloneFamily(anyCell),

View File

@ -24,138 +24,14 @@
<groupId>org.apache.hadoop</groupId>
<version>2.10.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId>
<name>Apache Hadoop YARN TimelineService HBase Server</name>
<version>2.10.0-SNAPSHOT</version>
<name>Apache Hadoop YARN TimelineService HBase Servers</name>
<packaging>pom</packaging>
<properties>
<!-- Needed for generating FindBugs warnings using parent pom -->
<yarn.basedir>${project.parent.parent.parent.basedir}</yarn.basedir>
</properties>
<dependencies>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice-hbase-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-sslengine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptor>src/assembly/coprocessor.xml</descriptor>
<attach>true</attach>
</configuration>
<executions>
<execution>
<id>create-coprocessor-jar</id>
<phase>prepare-package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<modules>
<module>hadoop-yarn-server-timelineservice-hbase-server-1</module>
</modules>
</project>