diff --git a/hbase-spark-it/pom.xml b/hbase-spark-it/pom.xml index a71c9b896d4..3910dc85411 100644 --- a/hbase-spark-it/pom.xml +++ b/hbase-spark-it/pom.xml @@ -159,26 +159,41 @@ - - maven-dependency-plugin - - - create-mrapp-generated-classpath - generate-test-resources - - build-classpath - - - - ${project.build.directory}/test-classes/spark-generated-classpath - - - - - + + maven-dependency-plugin + + + create-mrapp-generated-classpath + generate-test-resources + + build-classpath + + + + ${project.build.directory}/test-classes/spark-generated-classpath + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + checkstyle + validate + + check + + + true + + + + diff --git a/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java b/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java index b22c9ca4631..89eb087e66c 100644 --- a/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java +++ b/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java @@ -19,6 +19,18 @@ package org.apache.hadoop.hbase.spark; import com.google.common.collect.Sets; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + import org.apache.commons.cli.CommandLine; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.logging.Log; @@ -35,7 +47,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.IntegrationTestBase; import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.TableName; - import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -44,41 +55,29 @@ import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; - import org.apache.hadoop.hbase.io.ImmutableBytesWritable; - import org.apache.hadoop.hbase.mapreduce.IntegrationTestBulkLoad; import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.RegionSplitter; - import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; + +import org.apache.spark.Partitioner; import org.apache.spark.SerializableWritable; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; - -import org.apache.spark.Partitioner; - import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.VoidFunction; + import org.junit.Test; + import scala.Tuple2; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; /** * Test Bulk Load and Spark on a distributed cluster. @@ -89,7 +88,8 @@ import java.util.Set; * First add hbase related jars and hbase-spark.jar into spark classpath. * * spark-submit --class org.apache.hadoop.hbase.spark.IntegrationTestSparkBulkLoad - * HBASE_HOME/lib/hbase-spark-it-XXX-tests.jar -m slowDeterministic -Dhbase.spark.bulkload.chainlength=300 + * HBASE_HOME/lib/hbase-spark-it-XXX-tests.jar -m slowDeterministic + * -Dhbase.spark.bulkload.chainlength=300 */ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { @@ -140,7 +140,7 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { /** * Running spark job to create LinkedList for testing * @param iteration iteration th of this job - * @throws Exception + * @throws Exception if an HBase operation or getting the test directory fails */ public void runLinkedListSparkJob(int iteration) throws Exception { String jobName = IntegrationTestSparkBulkLoad.class.getSimpleName() + " _load " + @@ -168,7 +168,8 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { LOG.info("Partition RDD into " + partitionNum + " parts"); List temp = new ArrayList<>(); JavaRDD> rdd = jsc.parallelize(temp, partitionNum). - mapPartitionsWithIndex(new LinkedListCreationMapper(new SerializableWritable<>(hbaseConf)), false); + mapPartitionsWithIndex(new LinkedListCreationMapper(new SerializableWritable<>(hbaseConf)), + false); hbaseContext.bulkLoad(rdd, getTablename(), new ListToKeyValueFunc(), output.toUri().getPath(), new HashMap<>(), false, HConstants.DEFAULT_MAX_FILE_SIZE); @@ -267,14 +268,12 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { } /** - * After adding data to the table start a mr job to - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException + * After adding data to the table start a mr job to check the bulk load. */ public void runCheck() throws Exception { LOG.info("Running check"); - String jobName = IntegrationTestSparkBulkLoad.class.getSimpleName() + "_check" + EnvironmentEdgeManager.currentTime(); + String jobName = IntegrationTestSparkBulkLoad.class.getSimpleName() + "_check" + + EnvironmentEdgeManager.currentTime(); SparkConf sparkConf = new SparkConf().setAppName(jobName).setMaster("local"); Configuration hbaseConf = new Configuration(getConf()); @@ -318,14 +317,15 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { } /** - * PairFlatMapFunction used to transfer to Tuple + * PairFlatMapFunction used to transfer {@code } to + * {@code Tuple}. */ public static class LinkedListCheckingFlatMapFunc implements PairFlatMapFunction, SparkLinkKey, SparkLinkChain> { @Override - public Iterable> call(Tuple2 v) - throws Exception { + public Iterable> call(Tuple2 v) throws Exception { Result value = v._2(); long longRk = Bytes.toLong(value.getRow()); List> list = new LinkedList<>(); @@ -357,8 +357,10 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { Function2, SparkLinkChain, List> { @Override public List call(List v1, SparkLinkChain v2) throws Exception { - if (v1 == null) + if (v1 == null) { v1 = new LinkedList<>(); + } + v1.add(v2); return v1; } @@ -367,7 +369,8 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { public static class mergeCombinersFunc implements Function2, List, List> { @Override - public List call(List v1, List v2) throws Exception { + public List call(List v1, List v2) + throws Exception { v1.addAll(v2); return v1; } @@ -393,8 +396,10 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { @Override public int getPartition(Object key) { - if (!(key instanceof SparkLinkKey)) + if (!(key instanceof SparkLinkKey)) { return -1; + } + int hash = ((SparkLinkKey) key).getChainId().hashCode(); return Math.abs(hash % numPartions); @@ -402,7 +407,7 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { } /** - * Sort all LinkChain for one LinkKey, and test List + * Sort all LinkChain for one LinkKey, and test {@code List}. */ public static class LinkedListCheckingForeachFunc implements VoidFunction>> { @@ -483,8 +488,10 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { @Override public boolean equals(Object other) { - if (!(other instanceof SparkLinkKey)) + if (!(other instanceof SparkLinkKey)) { return false; + } + SparkLinkKey otherKey = (SparkLinkKey) other; return this.getChainId().equals(otherKey.getChainId()); } @@ -492,8 +499,11 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { @Override public int compareTo(SparkLinkKey other) { int res = getChainId().compareTo(other.getChainId()); - if (res == 0) - res= getOrder().compareTo(other.getOrder()); + + if (res == 0) { + res = getOrder().compareTo(other.getOrder()); + } + return res; } } @@ -536,8 +546,10 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { @Override public boolean equals(Object other) { - if (!(other instanceof SparkLinkChain)) + if (!(other instanceof SparkLinkChain)) { return false; + } + SparkLinkChain otherKey = (SparkLinkChain) other; return this.getRk().equals(otherKey.getRk()) && this.getNext().equals(otherKey.getNext()); } @@ -547,12 +559,15 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { /** * Allow the scan to go to replica, this would not affect the runCheck() * Since data are BulkLoaded from HFile into table - * @throws IOException - * @throws InterruptedException + * @throws IOException if an HBase operation fails + * @throws InterruptedException if modifying the table fails */ private void installSlowingCoproc() throws IOException, InterruptedException { int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT); - if (replicaCount == DEFAULT_NUM_REPLICA_COUNT) return; + + if (replicaCount == DEFAULT_NUM_REPLICA_COUNT) { + return; + } TableName t = getTablename(); Admin admin = util.getAdmin(); @@ -588,7 +603,10 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { ); int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT); - if (replicaCount == DEFAULT_NUM_REPLICA_COUNT) return; + + if (replicaCount == DEFAULT_NUM_REPLICA_COUNT) { + return; + } TableName t = getTablename(); HBaseTestingUtility.setReplicas(util.getAdmin(), t, replicaCount); @@ -605,7 +623,8 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { // Scale this up on a real cluster if (util.isDistributedCluster()) { - util.getConfiguration().setIfUnset(BULKLOAD_PARTITIONS_NUM, String.valueOf(DEFAULT_BULKLOAD_PARTITIONS_NUM)); + util.getConfiguration().setIfUnset(BULKLOAD_PARTITIONS_NUM, + String.valueOf(DEFAULT_BULKLOAD_PARTITIONS_NUM)); util.getConfiguration().setIfUnset(BULKLOAD_IMPORT_ROUNDS, "1"); } else { util.startMiniMapReduceCluster();