HBASE-19482 Fixed remaining Checkstyle errors in hbase-spark-it and enabled Checkstyle to fail on violations

This commit is contained in:
Jan Hentschel 2017-12-10 22:18:03 +01:00
parent 988ea870ed
commit 16a851431c
2 changed files with 95 additions and 61 deletions

View File

@ -178,7 +178,22 @@
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<executions>
<execution>
<id>checkstyle</id>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
<configuration>
<failOnViolation>true</failOnViolation>
</configuration>
</execution>
</executions>
</plugin>
</plugins> </plugins>
</build> </build>

View File

@ -19,6 +19,18 @@
package org.apache.hadoop.hbase.spark; package org.apache.hadoop.hbase.spark;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -35,7 +47,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.IntegrationTestBase; import org.apache.hadoop.hbase.IntegrationTestBase;
import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -44,41 +55,29 @@ import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.IntegrationTestBulkLoad; import org.apache.hadoop.hbase.mapreduce.IntegrationTestBulkLoad;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.Partitioner;
import org.apache.spark.SerializableWritable; import org.apache.spark.SerializableWritable;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.api.java.function.VoidFunction;
import org.junit.Test; import org.junit.Test;
import scala.Tuple2; import scala.Tuple2;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
/** /**
* Test Bulk Load and Spark on a distributed cluster. * Test Bulk Load and Spark on a distributed cluster.
@ -89,7 +88,8 @@ import java.util.Set;
* First add hbase related jars and hbase-spark.jar into spark classpath. * First add hbase related jars and hbase-spark.jar into spark classpath.
* *
* spark-submit --class org.apache.hadoop.hbase.spark.IntegrationTestSparkBulkLoad * spark-submit --class org.apache.hadoop.hbase.spark.IntegrationTestSparkBulkLoad
* HBASE_HOME/lib/hbase-spark-it-XXX-tests.jar -m slowDeterministic -Dhbase.spark.bulkload.chainlength=300 * HBASE_HOME/lib/hbase-spark-it-XXX-tests.jar -m slowDeterministic
* -Dhbase.spark.bulkload.chainlength=300
*/ */
public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
@ -140,7 +140,7 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
/** /**
* Running spark job to create LinkedList for testing * Running spark job to create LinkedList for testing
* @param iteration iteration th of this job * @param iteration iteration th of this job
* @throws Exception * @throws Exception if an HBase operation or getting the test directory fails
*/ */
public void runLinkedListSparkJob(int iteration) throws Exception { public void runLinkedListSparkJob(int iteration) throws Exception {
String jobName = IntegrationTestSparkBulkLoad.class.getSimpleName() + " _load " + String jobName = IntegrationTestSparkBulkLoad.class.getSimpleName() + " _load " +
@ -168,7 +168,8 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
LOG.info("Partition RDD into " + partitionNum + " parts"); LOG.info("Partition RDD into " + partitionNum + " parts");
List<String> temp = new ArrayList<>(); List<String> temp = new ArrayList<>();
JavaRDD<List<byte[]>> rdd = jsc.parallelize(temp, partitionNum). JavaRDD<List<byte[]>> rdd = jsc.parallelize(temp, partitionNum).
mapPartitionsWithIndex(new LinkedListCreationMapper(new SerializableWritable<>(hbaseConf)), false); mapPartitionsWithIndex(new LinkedListCreationMapper(new SerializableWritable<>(hbaseConf)),
false);
hbaseContext.bulkLoad(rdd, getTablename(), new ListToKeyValueFunc(), output.toUri().getPath(), hbaseContext.bulkLoad(rdd, getTablename(), new ListToKeyValueFunc(), output.toUri().getPath(),
new HashMap<>(), false, HConstants.DEFAULT_MAX_FILE_SIZE); new HashMap<>(), false, HConstants.DEFAULT_MAX_FILE_SIZE);
@ -267,14 +268,12 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
} }
/** /**
* After adding data to the table start a mr job to * After adding data to the table start a mr job to check the bulk load.
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/ */
public void runCheck() throws Exception { public void runCheck() throws Exception {
LOG.info("Running check"); LOG.info("Running check");
String jobName = IntegrationTestSparkBulkLoad.class.getSimpleName() + "_check" + EnvironmentEdgeManager.currentTime(); String jobName = IntegrationTestSparkBulkLoad.class.getSimpleName() + "_check" +
EnvironmentEdgeManager.currentTime();
SparkConf sparkConf = new SparkConf().setAppName(jobName).setMaster("local"); SparkConf sparkConf = new SparkConf().setAppName(jobName).setMaster("local");
Configuration hbaseConf = new Configuration(getConf()); Configuration hbaseConf = new Configuration(getConf());
@ -318,14 +317,15 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
} }
/** /**
* PairFlatMapFunction used to transfer <Row, Result> to Tuple <SparkLinkKey, SparkLinkChain> * PairFlatMapFunction used to transfer {@code <Row, Result>} to
* {@code Tuple<SparkLinkKey, SparkLinkChain>}.
*/ */
public static class LinkedListCheckingFlatMapFunc implements public static class LinkedListCheckingFlatMapFunc implements
PairFlatMapFunction<Tuple2<ImmutableBytesWritable, Result>, SparkLinkKey, SparkLinkChain> { PairFlatMapFunction<Tuple2<ImmutableBytesWritable, Result>, SparkLinkKey, SparkLinkChain> {
@Override @Override
public Iterable<Tuple2<SparkLinkKey, SparkLinkChain>> call(Tuple2<ImmutableBytesWritable, Result> v) public Iterable<Tuple2<SparkLinkKey, SparkLinkChain>> call(Tuple2<ImmutableBytesWritable,
throws Exception { Result> v) throws Exception {
Result value = v._2(); Result value = v._2();
long longRk = Bytes.toLong(value.getRow()); long longRk = Bytes.toLong(value.getRow());
List<Tuple2<SparkLinkKey, SparkLinkChain>> list = new LinkedList<>(); List<Tuple2<SparkLinkKey, SparkLinkChain>> list = new LinkedList<>();
@ -357,8 +357,10 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
Function2<List<SparkLinkChain>, SparkLinkChain, List<SparkLinkChain>> { Function2<List<SparkLinkChain>, SparkLinkChain, List<SparkLinkChain>> {
@Override @Override
public List<SparkLinkChain> call(List<SparkLinkChain> v1, SparkLinkChain v2) throws Exception { public List<SparkLinkChain> call(List<SparkLinkChain> v1, SparkLinkChain v2) throws Exception {
if (v1 == null) if (v1 == null) {
v1 = new LinkedList<>(); v1 = new LinkedList<>();
}
v1.add(v2); v1.add(v2);
return v1; return v1;
} }
@ -367,7 +369,8 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
public static class mergeCombinersFunc implements public static class mergeCombinersFunc implements
Function2<List<SparkLinkChain>, List<SparkLinkChain>, List<SparkLinkChain>> { Function2<List<SparkLinkChain>, List<SparkLinkChain>, List<SparkLinkChain>> {
@Override @Override
public List<SparkLinkChain> call(List<SparkLinkChain> v1, List<SparkLinkChain> v2) throws Exception { public List<SparkLinkChain> call(List<SparkLinkChain> v1, List<SparkLinkChain> v2)
throws Exception {
v1.addAll(v2); v1.addAll(v2);
return v1; return v1;
} }
@ -393,8 +396,10 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
@Override @Override
public int getPartition(Object key) { public int getPartition(Object key) {
if (!(key instanceof SparkLinkKey)) if (!(key instanceof SparkLinkKey)) {
return -1; return -1;
}
int hash = ((SparkLinkKey) key).getChainId().hashCode(); int hash = ((SparkLinkKey) key).getChainId().hashCode();
return Math.abs(hash % numPartions); return Math.abs(hash % numPartions);
@ -402,7 +407,7 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
} }
/** /**
* Sort all LinkChain for one LinkKey, and test List<LinkChain> * Sort all LinkChain for one LinkKey, and test {@code List<LinkChain>}.
*/ */
public static class LinkedListCheckingForeachFunc public static class LinkedListCheckingForeachFunc
implements VoidFunction<Tuple2<SparkLinkKey, List<SparkLinkChain>>> { implements VoidFunction<Tuple2<SparkLinkKey, List<SparkLinkChain>>> {
@ -483,8 +488,10 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
@Override @Override
public boolean equals(Object other) { public boolean equals(Object other) {
if (!(other instanceof SparkLinkKey)) if (!(other instanceof SparkLinkKey)) {
return false; return false;
}
SparkLinkKey otherKey = (SparkLinkKey) other; SparkLinkKey otherKey = (SparkLinkKey) other;
return this.getChainId().equals(otherKey.getChainId()); return this.getChainId().equals(otherKey.getChainId());
} }
@ -492,8 +499,11 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
@Override @Override
public int compareTo(SparkLinkKey other) { public int compareTo(SparkLinkKey other) {
int res = getChainId().compareTo(other.getChainId()); int res = getChainId().compareTo(other.getChainId());
if (res == 0)
res= getOrder().compareTo(other.getOrder()); if (res == 0) {
res = getOrder().compareTo(other.getOrder());
}
return res; return res;
} }
} }
@ -536,8 +546,10 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
@Override @Override
public boolean equals(Object other) { public boolean equals(Object other) {
if (!(other instanceof SparkLinkChain)) if (!(other instanceof SparkLinkChain)) {
return false; return false;
}
SparkLinkChain otherKey = (SparkLinkChain) other; SparkLinkChain otherKey = (SparkLinkChain) other;
return this.getRk().equals(otherKey.getRk()) && this.getNext().equals(otherKey.getNext()); return this.getRk().equals(otherKey.getRk()) && this.getNext().equals(otherKey.getNext());
} }
@ -547,12 +559,15 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
/** /**
* Allow the scan to go to replica, this would not affect the runCheck() * Allow the scan to go to replica, this would not affect the runCheck()
* Since data are BulkLoaded from HFile into table * Since data are BulkLoaded from HFile into table
* @throws IOException * @throws IOException if an HBase operation fails
* @throws InterruptedException * @throws InterruptedException if modifying the table fails
*/ */
private void installSlowingCoproc() throws IOException, InterruptedException { private void installSlowingCoproc() throws IOException, InterruptedException {
int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT); int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT);
if (replicaCount == DEFAULT_NUM_REPLICA_COUNT) return;
if (replicaCount == DEFAULT_NUM_REPLICA_COUNT) {
return;
}
TableName t = getTablename(); TableName t = getTablename();
Admin admin = util.getAdmin(); Admin admin = util.getAdmin();
@ -588,7 +603,10 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
); );
int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT); int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT);
if (replicaCount == DEFAULT_NUM_REPLICA_COUNT) return;
if (replicaCount == DEFAULT_NUM_REPLICA_COUNT) {
return;
}
TableName t = getTablename(); TableName t = getTablename();
HBaseTestingUtility.setReplicas(util.getAdmin(), t, replicaCount); HBaseTestingUtility.setReplicas(util.getAdmin(), t, replicaCount);
@ -605,7 +623,8 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
// Scale this up on a real cluster // Scale this up on a real cluster
if (util.isDistributedCluster()) { if (util.isDistributedCluster()) {
util.getConfiguration().setIfUnset(BULKLOAD_PARTITIONS_NUM, String.valueOf(DEFAULT_BULKLOAD_PARTITIONS_NUM)); util.getConfiguration().setIfUnset(BULKLOAD_PARTITIONS_NUM,
String.valueOf(DEFAULT_BULKLOAD_PARTITIONS_NUM));
util.getConfiguration().setIfUnset(BULKLOAD_IMPORT_ROUNDS, "1"); util.getConfiguration().setIfUnset(BULKLOAD_IMPORT_ROUNDS, "1");
} else { } else {
util.startMiniMapReduceCluster(); util.startMiniMapReduceCluster();