HBASE-18175 Add hbase-spark integration test into hbase-spark-it

* adds module hbase-spark-it
* adds test IntegrationTestSparkBulkLoad
* adds resultant jar to bin assembly

Signed-off-by: Mike Drob <mdrob@apache.org>
Signed-off-by: Sean Busbey <busbey@apache.org>
This commit is contained in:
Yi Liang 2017-07-12 17:12:52 -07:00 committed by Sean Busbey
parent 81ffd6a13e
commit 79a702d112
5 changed files with 1030 additions and 0 deletions

View File

@ -269,6 +269,12 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-spark-it</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
</dependencies>
<profiles>

View File

@ -56,6 +56,7 @@
<include>org.apache.hbase:hbase-shaded</include>
<include>org.apache.hbase:hbase-shell</include>
<include>org.apache.hbase:hbase-spark</include>
<include>org.apache.hbase:hbase-spark-it</include>
<include>org.apache.hbase:hbase-testing-util</include>
<include>org.apache.hbase:hbase-thrift</include>
</includes>

361
hbase-spark-it/pom.xml Normal file
View File

@ -0,0 +1,361 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>hbase</artifactId>
<groupId>org.apache.hbase</groupId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>hbase-spark-it</artifactId>
<name>Apache HBase - Spark Integration Tests</name>
<description>Integration and System tests for HBase</description>
<properties>
<spark.version>1.6.0</spark.version>
<scala.version>2.10.4</scala.version>
<scala.binary.version>2.10</scala.binary.version>
<!-- Test inclusion patterns used by failsafe configuration -->
<unittest.include>**/Test*.java</unittest.include>
<integrationtest.include>**/IntegrationTest*.java</integrationtest.include>
<!-- To Run Tests with a particular Xmx Value use -Dfailsafe.Xmx=XXXg -->
<failsafe.Xmx>4g</failsafe.Xmx>
<!-- To run a single integration test, use -Dit.test=IntegrationTestXXX -->
</properties>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<!-- Make a jar and put the sources in the jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>
<plugin>
<!--Make it so assembly:single does nothing in here-->
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<skipAssembly>true</skipAssembly>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${surefire.version}</version>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit4</artifactId>
<version>${surefire.version}</version>
</dependency>
</dependencies>
<configuration>
<includes>
<include>${integrationtest.include}</include>
</includes>
<excludes>
<exclude>${unittest.include}</exclude>
<exclude>**/*$*</exclude>
</excludes>
<redirectTestOutputToFile>${test.output.tofile}</redirectTestOutputToFile>
<failIfNoTests>false</failIfNoTests>
<testFailureIgnore>false</testFailureIgnore>
</configuration>
<executions>
<execution>
<id>integration-test</id>
<phase>integration-test</phase>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
<execution>
<id>verify</id>
<phase>verify</phase>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<!-- Run integration tests with mvn verify -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<skip>false</skip>
<forkMode>always</forkMode>
<!-- TODO: failsafe does timeout, but verify does not fail the build because of the timeout.
I believe it is a failsafe bug, we may consider using surefire -->
<forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
<argLine>-enableassertions -Xmx${failsafe.Xmx}
-Djava.security.egd=file:/dev/./urandom -XX:+CMSClassUnloadingEnabled
-verbose:gc -XX:+PrintCommandLineFlags -XX:+PrintFlagsFinal</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<executions>
<!-- purposefully have jsr 305 exclusion only warn in this module -->
<execution>
<id>banned-jsr305</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<fail>false</fail>
</configuration>
</execution>
<!-- hbase-spark is ok in this modules -->
<execution>
<id>banned-hbase-spark</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<skip>true</skip>
</configuration>
</execution>
<execution>
<id>banned-scala</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<skip>true</skip>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>create-mrapp-generated-classpath</id>
<phase>generate-test-resources</phase>
<goals>
<goal>build-classpath</goal>
</goals>
<configuration>
<!-- needed to run the unit test for DS to generate
the required classpath that is required in the env
of the launch container in the mini cluster
-->
<outputFile>${project.build.directory}/test-classes/spark-generated-classpath</outputFile>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<!-- Intra-project dependencies -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-annotations</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-rsgroup</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-rsgroup</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-spark</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-it</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>${compat.module}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-testing-util</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math</artifactId>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
</dependency>
<!-- Hadoop needs Netty 3.x at test scope for the minicluster -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>${netty.hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<!-- make sure wrong scala version is not pulled in -->
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
<exclusion>
<!-- make sure wrong scala version is not pulled in -->
<groupId>org.scala-lang</groupId>
<artifactId>scalap</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<!-- Skip the tests in this module -->
<profile>
<id>skipIntegrationTests</id>
<activation>
<property>
<name>skipIntegrationTests</name>
</property>
</activation>
<properties>
<skipTests>true</skipTests>
</properties>
</profile>
</profiles>
<reporting>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-report-plugin</artifactId>
<version>2.7.2</version>
<reportSets>
<reportSet>
<id>spark-integration-tests</id>
<reports>
<report>report-only</report>
</reports>
<configuration>
<outputName>failsafe-report</outputName>
<reportsDirectories>
<reportsDirectory>${project.build.directory}/failsafe-reports</reportsDirectory>
</reportsDirectories>
</configuration>
</reportSet>
</reportSets>
</plugin>
</plugins>
</reporting>
</project>

View File

@ -0,0 +1,661 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark;
import com.google.common.collect.Sets;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.IntegrationTestBase;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.IntegrationTestBulkLoad;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.SerializableWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.junit.Test;
import scala.Tuple2;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
/**
* Test Bulk Load and Spark on a distributed cluster.
* It starts an Spark job that creates linked chains.
* This test mimic {@link IntegrationTestBulkLoad} in mapreduce.
*
* Usage on cluster:
* spark-submit --class org.apache.hadoop.hbase.spark.IntegrationTestSparkBulkLoad
* HBASE_HOME/lib/hbase-it-XXX-tests.jar -m slowDeterministic -Dhbase.spark.bulkload.chainlength=300
*/
public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
private static final Log LOG = LogFactory.getLog(IntegrationTestSparkBulkLoad.class);
// The number of partitions for random generated data
private static String BULKLOAD_PARTITIONS_NUM = "hbase.spark.bulkload.partitionsnum";
private static int DEFAULT_BULKLOAD_PARTITIONS_NUM = 3;
private static String BULKLOAD_CHAIN_LENGTH = "hbase.spark.bulkload.chainlength";
private static int DEFAULT_BULKLOAD_CHAIN_LENGTH = 200000;
private static String BULKLOAD_IMPORT_ROUNDS = "hbase.spark.bulkload.importround";
private static int DEFAULT_BULKLOAD_IMPORT_ROUNDS = 1;
private static String CURRENT_ROUND_NUM = "hbase.spark.bulkload.current.roundnum";
private static String NUM_REPLICA_COUNT_KEY = "hbase.spark.bulkload.replica.countkey";
private static int DEFAULT_NUM_REPLICA_COUNT = 1;
private static String BULKLOAD_TABLE_NAME = "hbase.spark.bulkload.tableName";
private static String DEFAULT_BULKLOAD_TABLE_NAME = "IntegrationTestSparkBulkLoad";
private static String BULKLOAD_OUTPUT_PATH = "hbase.spark.bulkload.output.path";
private static final String OPT_LOAD = "load";
private static final String OPT_CHECK = "check";
private boolean load = false;
private boolean check = false;
private static final byte[] CHAIN_FAM = Bytes.toBytes("L");
private static final byte[] SORT_FAM = Bytes.toBytes("S");
private static final byte[] DATA_FAM = Bytes.toBytes("D");
/**
* Running spark job to load data into hbase table
*/
public void runLoad() throws Exception {
setupTable();
int numImportRounds = getConf().getInt(BULKLOAD_IMPORT_ROUNDS, DEFAULT_BULKLOAD_IMPORT_ROUNDS);
LOG.info("Running load with numIterations:" + numImportRounds);
for (int i = 0; i < numImportRounds; i++) {
runLinkedListSparkJob(i);
}
}
/**
* Running spark job to create LinkedList for testing
* @param iteration iteration th of this job
* @throws Exception
*/
public void runLinkedListSparkJob(int iteration) throws Exception {
String jobName = IntegrationTestSparkBulkLoad.class.getSimpleName() + " _load " +
EnvironmentEdgeManager.currentTime();
LOG.info("Running iteration " + iteration + "in Spark Job");
Path output = null;
if (conf.get(BULKLOAD_OUTPUT_PATH) == null) {
output = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration);
} else {
output = new Path(conf.get(BULKLOAD_OUTPUT_PATH));
}
SparkConf sparkConf = new SparkConf().setAppName(jobName).setMaster("local");
Configuration hbaseConf = new Configuration(getConf());
hbaseConf.setInt(CURRENT_ROUND_NUM, iteration);
int partitionNum = hbaseConf.getInt(BULKLOAD_PARTITIONS_NUM, DEFAULT_BULKLOAD_PARTITIONS_NUM);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, hbaseConf);
LOG.info("Partition RDD into " + partitionNum + " parts");
List<String> temp = new ArrayList<>();
JavaRDD<List<byte[]>> rdd = jsc.parallelize(temp, partitionNum).
mapPartitionsWithIndex(new LinkedListCreationMapper(new SerializableWritable<>(hbaseConf)), false);
hbaseContext.bulkLoad(rdd, getTablename(), new ListToKeyValueFunc(), output.toUri().getPath(),
new HashMap<>(), false, HConstants.DEFAULT_MAX_FILE_SIZE);
try (Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin();
Table table = conn.getTable(getTablename());
RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
// Create a new loader.
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
// Load the HFiles into table.
loader.doBulkLoad(output, admin, table, regionLocator);
}
// Delete the files.
util.getTestFileSystem().delete(output, true);
jsc.close();
}
// See mapreduce.IntegrationTestBulkLoad#LinkedListCreationMapper
// Used to generate test data
public static class LinkedListCreationMapper implements
Function2<Integer, Iterator<String>, Iterator<List<byte[]>>> {
SerializableWritable swConfig = null;
private Random rand = new Random();
public LinkedListCreationMapper(SerializableWritable conf) {
this.swConfig = conf;
}
@Override
public Iterator<List<byte[]>> call(Integer v1, Iterator v2) throws Exception {
Configuration config = (Configuration) swConfig.value();
int partitionId = v1.intValue();
LOG.info("Starting create List in Partition " + partitionId);
int partitionNum = config.getInt(BULKLOAD_PARTITIONS_NUM, DEFAULT_BULKLOAD_PARTITIONS_NUM);
int chainLength = config.getInt(BULKLOAD_CHAIN_LENGTH, DEFAULT_BULKLOAD_CHAIN_LENGTH);
int iterationsNum = config.getInt(BULKLOAD_IMPORT_ROUNDS, DEFAULT_BULKLOAD_IMPORT_ROUNDS);
int iterationsCur = config.getInt(CURRENT_ROUND_NUM, 0);
List<List<byte[]>> res = new LinkedList<>();
long tempId = partitionId + iterationsCur * partitionNum;
long totalPartitionNum = partitionNum * iterationsNum;
long chainId = Math.abs(rand.nextLong());
chainId = chainId - (chainId % totalPartitionNum) + tempId;
byte[] chainIdArray = Bytes.toBytes(chainId);
long currentRow = 0;
long nextRow = getNextRow(0, chainLength);
for(long i = 0; i < chainLength; i++) {
byte[] rk = Bytes.toBytes(currentRow);
// Insert record into a list
List<byte[]> tmp1 = Arrays.asList(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow));
List<byte[]> tmp2 = Arrays.asList(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i));
List<byte[]> tmp3 = Arrays.asList(rk, DATA_FAM, chainIdArray, Bytes.toBytes(
RandomStringUtils.randomAlphabetic(50)));
res.add(tmp1);
res.add(tmp2);
res.add(tmp3);
currentRow = nextRow;
nextRow = getNextRow(i+1, chainLength);
}
return res.iterator();
}
/** Returns a unique row id within this chain for this index */
private long getNextRow(long index, long chainLength) {
long nextRow = Math.abs(new Random().nextLong());
// use significant bits from the random number, but pad with index to ensure it is unique
// this also ensures that we do not reuse row = 0
// row collisions from multiple mappers are fine, since we guarantee unique chainIds
nextRow = nextRow - (nextRow % chainLength) + index;
return nextRow;
}
}
public static class ListToKeyValueFunc implements
Function<List<byte[]>, Pair<KeyFamilyQualifier, byte[]>> {
@Override
public Pair<KeyFamilyQualifier, byte[]> call(List<byte[]> v1) throws Exception {
if (v1 == null || v1.size() != 4) {
return null;
}
KeyFamilyQualifier kfq = new KeyFamilyQualifier(v1.get(0), v1.get(1), v1.get(2));
return new Pair<>(kfq, v1.get(3));
}
}
/**
* After adding data to the table start a mr job to
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
public void runCheck() throws Exception {
LOG.info("Running check");
String jobName = IntegrationTestSparkBulkLoad.class.getSimpleName() + "_check" + EnvironmentEdgeManager.currentTime();
SparkConf sparkConf = new SparkConf().setAppName(jobName).setMaster("local");
Configuration hbaseConf = new Configuration(getConf());
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, hbaseConf);
Scan scan = new Scan();
scan.addFamily(CHAIN_FAM);
scan.addFamily(SORT_FAM);
scan.setMaxVersions(1);
scan.setCacheBlocks(false);
scan.setBatch(1000);
int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT);
if (replicaCount != DEFAULT_NUM_REPLICA_COUNT) {
scan.setConsistency(Consistency.TIMELINE);
}
// 1. Using TableInputFormat to get data from HBase table
// 2. Mimic LinkedListCheckingMapper in mapreduce.IntegrationTestBulkLoad
// 3. Sort LinkKey by its order ID
// 4. Group LinkKey if they have same chainId, and repartition RDD by NaturalKeyPartitioner
// 5. Check LinkList in each Partition using LinkedListCheckingFlatMapFunc
hbaseContext.hbaseRDD(getTablename(), scan).flatMapToPair(new LinkedListCheckingFlatMapFunc())
.sortByKey()
.combineByKey(new createCombinerFunc(), new mergeValueFunc(), new mergeCombinersFunc(),
new NaturalKeyPartitioner(new SerializableWritable<>(hbaseConf)))
.foreach(new LinkedListCheckingForeachFunc(new SerializableWritable<>(hbaseConf)));
jsc.close();
}
private void runCheckWithRetry() throws Exception {
try {
runCheck();
} catch (Throwable t) {
LOG.warn("Received " + StringUtils.stringifyException(t));
LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not");
runCheck();
throw t; // we should still fail the test even if second retry succeeds
}
// everything green
}
/**
* PairFlatMapFunction used to transfer <Row, Result> to Tuple <SparkLinkKey, SparkLinkChain>
*/
public static class LinkedListCheckingFlatMapFunc implements
PairFlatMapFunction<Tuple2<ImmutableBytesWritable, Result>, SparkLinkKey, SparkLinkChain> {
@Override
public Iterable<Tuple2<SparkLinkKey, SparkLinkChain>> call(Tuple2<ImmutableBytesWritable, Result> v)
throws Exception {
Result value = v._2();
long longRk = Bytes.toLong(value.getRow());
List<Tuple2<SparkLinkKey, SparkLinkChain>> list = new LinkedList<>();
for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) {
long chainId = Bytes.toLong(entry.getKey());
long next = Bytes.toLong(entry.getValue());
Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0);
long order = Bytes.toLong(CellUtil.cloneValue(c));
Tuple2<SparkLinkKey, SparkLinkChain> tuple2 =
new Tuple2<>(new SparkLinkKey(chainId, order), new SparkLinkChain(longRk, next));
list.add(tuple2);
}
return list;
}
}
public static class createCombinerFunc implements
Function<SparkLinkChain, List<SparkLinkChain>> {
@Override
public List<SparkLinkChain> call(SparkLinkChain v1) throws Exception {
List<SparkLinkChain> list = new LinkedList<>();
list.add(v1);
return list;
}
}
public static class mergeValueFunc implements
Function2<List<SparkLinkChain>, SparkLinkChain, List<SparkLinkChain>> {
@Override
public List<SparkLinkChain> call(List<SparkLinkChain> v1, SparkLinkChain v2) throws Exception {
if (v1 == null)
v1 = new LinkedList<>();
v1.add(v2);
return v1;
}
}
public static class mergeCombinersFunc implements
Function2<List<SparkLinkChain>, List<SparkLinkChain>, List<SparkLinkChain>> {
@Override
public List<SparkLinkChain> call(List<SparkLinkChain> v1, List<SparkLinkChain> v2) throws Exception {
v1.addAll(v2);
return v1;
}
}
/**
* Class to figure out what partition to send a link in the chain to. This is based upon
* the linkKey's ChainId.
*/
public static class NaturalKeyPartitioner extends Partitioner {
private int numPartions = 0;
public NaturalKeyPartitioner(SerializableWritable swConf) {
Configuration hbaseConf = (Configuration) swConf.value();
numPartions = hbaseConf.getInt(BULKLOAD_PARTITIONS_NUM, DEFAULT_BULKLOAD_PARTITIONS_NUM);
}
@Override
public int numPartitions() {
return numPartions;
}
@Override
public int getPartition(Object key) {
if (!(key instanceof SparkLinkKey))
return -1;
int hash = ((SparkLinkKey) key).getChainId().hashCode();
return Math.abs(hash % numPartions);
}
}
/**
* Sort all LinkChain for one LinkKey, and test List<LinkChain>
*/
public static class LinkedListCheckingForeachFunc
implements VoidFunction<Tuple2<SparkLinkKey, List<SparkLinkChain>>> {
private SerializableWritable swConf = null;
public LinkedListCheckingForeachFunc(SerializableWritable conf) {
swConf = conf;
}
@Override
public void call(Tuple2<SparkLinkKey, List<SparkLinkChain>> v1) throws Exception {
long next = -1L;
long prev = -1L;
long count = 0L;
SparkLinkKey key = v1._1();
List<SparkLinkChain> values = v1._2();
for (SparkLinkChain lc : values) {
if (next == -1) {
if (lc.getRk() != 0L) {
String msg = "Chains should all start at rk 0, but read rk " + lc.getRk()
+ ". Chain:" + key.getChainId() + ", order:" + key.getOrder();
throw new RuntimeException(msg);
}
next = lc.getNext();
} else {
if (next != lc.getRk()) {
String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting "
+ next + " but got " + lc.getRk() + ". Chain:" + key.getChainId()
+ ", order:" + key.getOrder();
throw new RuntimeException(msg);
}
prev = lc.getRk();
next = lc.getNext();
}
count++;
}
Configuration hbaseConf = (Configuration) swConf.value();
int expectedChainLen = hbaseConf.getInt(BULKLOAD_CHAIN_LENGTH, DEFAULT_BULKLOAD_CHAIN_LENGTH);
if (count != expectedChainLen) {
String msg = "Chain wasn't the correct length. Expected " + expectedChainLen + " got "
+ count + ". Chain:" + key.getChainId() + ", order:" + key.getOrder();
throw new RuntimeException(msg);
}
}
}
/**
* Writable class used as the key to group links in the linked list.
*
* Used as the key emited from a pass over the table.
*/
public static class SparkLinkKey implements java.io.Serializable, Comparable<SparkLinkKey> {
private Long chainId;
private Long order;
public Long getOrder() {
return order;
}
public Long getChainId() {
return chainId;
}
public SparkLinkKey(long chainId, long order) {
this.chainId = chainId;
this.order = order;
}
@Override
public int hashCode() {
return this.getChainId().hashCode();
}
@Override
public boolean equals(Object other) {
if (!(other instanceof SparkLinkKey))
return false;
SparkLinkKey otherKey = (SparkLinkKey) other;
return this.getChainId().equals(otherKey.getChainId());
}
@Override
public int compareTo(SparkLinkKey other) {
int res = getChainId().compareTo(other.getChainId());
if (res == 0)
res= getOrder().compareTo(other.getOrder());
return res;
}
}
/**
* Writable used as the value emitted from a pass over the hbase table.
*/
public static class SparkLinkChain implements java.io.Serializable, Comparable<SparkLinkChain>{
public Long getNext() {
return next;
}
public Long getRk() {
return rk;
}
public SparkLinkChain(Long rk, Long next) {
this.rk = rk;
this.next = next;
}
private Long rk;
private Long next;
@Override
public int compareTo(SparkLinkChain linkChain) {
int res = getRk().compareTo(linkChain.getRk());
if (res == 0) {
res = getNext().compareTo(linkChain.getNext());
}
return res;
}
@Override
public int hashCode() {
return getRk().hashCode() ^ getNext().hashCode();
}
@Override
public boolean equals(Object other) {
if (!(other instanceof SparkLinkChain))
return false;
SparkLinkChain otherKey = (SparkLinkChain) other;
return this.getRk().equals(otherKey.getRk()) && this.getNext().equals(otherKey.getNext());
}
}
/**
* Allow the scan to go to replica, this would not affect the runCheck()
* Since data are BulkLoaded from HFile into table
* @throws IOException
* @throws InterruptedException
*/
private void installSlowingCoproc() throws IOException, InterruptedException {
int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT);
if (replicaCount == DEFAULT_NUM_REPLICA_COUNT) return;
TableName t = getTablename();
Admin admin = util.getAdmin();
HTableDescriptor desc = admin.getTableDescriptor(t);
desc.addCoprocessor(IntegrationTestBulkLoad.SlowMeCoproScanOperations.class.getName());
HBaseTestingUtility.modifyTableSync(admin, desc);
}
@Test
public void testBulkLoad() throws Exception {
runLoad();
installSlowingCoproc();
runCheckWithRetry();
}
private byte[][] getSplits(int numRegions) {
RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
split.setFirstRow(Bytes.toBytes(0L));
split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
return split.split(numRegions);
}
private void setupTable() throws IOException, InterruptedException {
if (util.getAdmin().tableExists(getTablename())) {
util.deleteTable(getTablename());
}
util.createTable(
getTablename(),
new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM},
getSplits(16)
);
int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT);
if (replicaCount == DEFAULT_NUM_REPLICA_COUNT) return;
TableName t = getTablename();
HBaseTestingUtility.setReplicas(util.getAdmin(), t, replicaCount);
}
@Override
public void setUpCluster() throws Exception {
util = getTestingUtil(getConf());
util.initializeCluster(1);
int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT);
if (LOG.isDebugEnabled() && replicaCount != DEFAULT_NUM_REPLICA_COUNT) {
LOG.debug("Region Replicas enabled: " + replicaCount);
}
// Scale this up on a real cluster
if (util.isDistributedCluster()) {
util.getConfiguration().setIfUnset(BULKLOAD_PARTITIONS_NUM, String.valueOf(DEFAULT_BULKLOAD_PARTITIONS_NUM));
util.getConfiguration().setIfUnset(BULKLOAD_IMPORT_ROUNDS, "1");
} else {
util.startMiniMapReduceCluster();
}
}
@Override
protected void addOptions() {
super.addOptions();
super.addOptNoArg(OPT_CHECK, "Run check only");
super.addOptNoArg(OPT_LOAD, "Run load only");
}
@Override
protected void processOptions(CommandLine cmd) {
super.processOptions(cmd);
check = cmd.hasOption(OPT_CHECK);
load = cmd.hasOption(OPT_LOAD);
}
@Override
public int runTestFromCommandLine() throws Exception {
if (load) {
runLoad();
} else if (check) {
installSlowingCoproc();
runCheckWithRetry();
} else {
testBulkLoad();
}
return 0;
}
@Override
public TableName getTablename() {
return getTableName(getConf());
}
public static TableName getTableName(Configuration conf) {
return TableName.valueOf(conf.get(BULKLOAD_TABLE_NAME, DEFAULT_BULKLOAD_TABLE_NAME));
}
@Override
protected Set<String> getColumnFamilies() {
return Sets.newHashSet(Bytes.toString(CHAIN_FAM) , Bytes.toString(DATA_FAM),
Bytes.toString(SORT_FAM));
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
IntegrationTestingUtility.setUseDistributedCluster(conf);
int status = ToolRunner.run(conf, new IntegrationTestSparkBulkLoad(), args);
System.exit(status);
}
}

View File

@ -86,6 +86,7 @@
<module>hbase-archetypes</module>
<module>hbase-metrics-api</module>
<module>hbase-metrics</module>
<module>hbase-spark-it</module>
</modules>
<!--Add apache snapshots in case we want to use unreleased versions of plugins:
e.g. surefire 2.18-SNAPSHOT-->