From 30f7d127c3974cff9e3058e13d7c50805ee4482f Mon Sep 17 00:00:00 2001 From: Ted Malaska Date: Tue, 28 Jul 2015 11:10:37 -0500 Subject: [PATCH] HBASE-13992 Integrate SparkOnHBase into HBase Signed-off-by: Sean Busbey --- dev-support/test-patch.properties | 3 +- hbase-spark/pom.xml | 572 ++++++++++++++++++ .../JavaHBaseBulkDeleteExample.java | 80 +++ .../hbasecontext/JavaHBaseBulkGetExample.java | 115 ++++ .../hbasecontext/JavaHBaseBulkPutExample.java | 90 +++ .../JavaHBaseDistributedScan.java | 81 +++ .../JavaHBaseMapGetPutExample.java | 105 ++++ .../JavaHBaseStreamingBulkPutExample.java | 90 +++ .../hadoop/hbase/spark/HBaseContext.scala | 570 +++++++++++++++++ .../hbase/spark/HBaseDStreamFunctions.scala | 158 +++++ .../hbase/spark/HBaseRDDFunctions.scala | 162 +++++ .../hadoop/hbase/spark/JavaHBaseContext.scala | 347 +++++++++++ .../hbasecontext/HBaseBulkDeleteExample.scala | 63 ++ .../hbasecontext/HBaseBulkGetExample.scala | 93 +++ .../hbasecontext/HBaseBulkPutExample.scala | 75 +++ .../HBaseBulkPutExampleFromFile.scala | 76 +++ .../HBaseBulkPutTimestampExample.scala | 77 +++ .../HBaseDistributedScanExample.scala | 61 ++ .../HBaseStreamingBulkPutExample.scala | 74 +++ .../example/rdd/HBaseBulkDeleteExample.scala | 64 ++ .../example/rdd/HBaseBulkGetExample.scala | 88 +++ .../example/rdd/HBaseBulkPutExample.scala | 76 +++ .../rdd/HBaseForeachPartitionExample.scala | 83 +++ .../rdd/HBaseMapPartitionExample.scala | 89 +++ .../hbase/spark/JavaHBaseContextSuite.java | 334 ++++++++++ .../hbase/spark/HBaseContextSuite.scala | 344 +++++++++++ .../spark/HBaseDStreamFunctionsSuite.scala | 129 ++++ .../hbase/spark/HBaseRDDFunctionsSuite.scala | 398 ++++++++++++ pom.xml | 1 + 29 files changed, 4497 insertions(+), 1 deletion(-) create mode 100644 hbase-spark/pom.xml create mode 100644 hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java create mode 100644 hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java create mode 100644 hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java create mode 100644 hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java create mode 100644 hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java create mode 100644 hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala create mode 100644 hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java create mode 100644 hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala create mode 100644 hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctionsSuite.scala create mode 100644 hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala diff --git a/dev-support/test-patch.properties b/dev-support/test-patch.properties index c652e3f4851..7e759659be2 100644 --- a/dev-support/test-patch.properties +++ b/dev-support/test-patch.properties @@ -21,7 +21,8 @@ MAVEN_OPTS="${MAVEN_OPTS:-"-Xmx3100M"}" OK_RELEASEAUDIT_WARNINGS=0 # Allow four warnings. Javadoc complains about sun.misc.Unsafe use. # See HBASE-7457, HBASE-13761 -OK_JAVADOC_WARNINGS=7 +# Allow 2 additional warnings for Scala stub notice about MR. See HBASE-13992 +OK_JAVADOC_WARNINGS=9 MAX_LINE_LENGTH=100 diff --git a/hbase-spark/pom.xml b/hbase-spark/pom.xml new file mode 100644 index 00000000000..e48f9e81ad6 --- /dev/null +++ b/hbase-spark/pom.xml @@ -0,0 +1,572 @@ + + + + + + 4.0.0 + + + hbase + org.apache.hbase + 2.0.0-SNAPSHOT + .. + + hbase-spark + Apache HBase - Spark + + + 1.3.0 + 2.10.4 + 2.10 + ${project.basedir}/.. + + + + + + javax.servlet + javax.servlet-api + 3.0.1 + test + + + + + org.scala-lang + scala-library + ${scala.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + provided + + + + org.scala-lang + scala-library + + + + org.scala-lang + scalap + + + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${spark.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${spark.version} + test-jar + tests + test + + + junit + junit + test + + + + org.scalatest + scalatest_${scala.binary.version} + 2.2.4 + test + + + + org.scalamock + scalamock-scalatest-support_${scala.binary.version} + 3.1.4 + test + + + + org.apache.hadoop + hadoop-client + ${hadoop-two.version} + + + log4j + log4j + + + javax.servlet + servlet-api + + + javax.servlet.jsp + jsp-api + + + org.jruby + jruby-complete + + + org.jboss.netty + netty + + + io.netty + netty + + + + + + org.apache.hadoop + hadoop-common + ${hadoop-two.version} + + + log4j + log4j + + + javax.servlet + servlet-api + + + javax.servlet.jsp + jsp-api + + + org.jruby + jruby-complete + + + org.jboss.netty + netty + + + io.netty + netty + + + + + + org.apache.hadoop + hadoop-common + ${hadoop-two.version} + test-jar + test + + + log4j + log4j + + + javax.servlet + servlet-api + + + javax.servlet.jsp + jsp-api + + + org.jruby + jruby-complete + + + org.jboss.netty + netty + + + io.netty + netty + + + + + + org.apache.hadoop + hadoop-hdfs + ${hadoop-two.version} + test-jar + test + + + log4j + log4j + + + javax.servlet + servlet-api + + + javax.servlet.jsp + jsp-api + + + org.jruby + jruby-complete + + + org.jboss.netty + netty + + + io.netty + netty + + + + + + org.apache.hbase + hbase-client + + + log4j + log4j + + + org.apache.thrift + thrift + + + org.jruby + jruby-complete + + + org.slf4j + slf4j-log4j12 + + + org.mortbay.jetty + jsp-2.1 + + + org.mortbay.jetty + jsp-api-2.1 + + + org.mortbay.jetty + servlet-api-2.5 + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-json + + + com.sun.jersey + jersey-server + + + org.mortbay.jetty + jetty + + + org.mortbay.jetty + jetty-util + + + tomcat + jasper-runtime + + + tomcat + jasper-compiler + + + org.jruby + jruby-complete + + + org.jboss.netty + netty + + + io.netty + netty + + + + + + org.apache.hbase + hbase-hadoop-compat + ${project.version} + test + test-jar + + + log4j + log4j + + + org.apache.thrift + thrift + + + org.jruby + jruby-complete + + + org.slf4j + slf4j-log4j12 + + + org.mortbay.jetty + jsp-2.1 + + + org.mortbay.jetty + jsp-api-2.1 + + + org.mortbay.jetty + servlet-api-2.5 + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-json + + + com.sun.jersey + jersey-server + + + org.mortbay.jetty + jetty + + + org.mortbay.jetty + jetty-util + + + tomcat + jasper-runtime + + + tomcat + jasper-compiler + + + org.jruby + jruby-complete + + + org.jboss.netty + netty + + + io.netty + netty + + + + + + org.apache.hbase + hbase-hadoop2-compat + ${project.version} + test + test-jar + + + log4j + log4j + + + org.apache.thrift + thrift + + + org.jruby + jruby-complete + + + org.slf4j + slf4j-log4j12 + + + org.mortbay.jetty + jsp-2.1 + + + org.mortbay.jetty + jsp-api-2.1 + + + org.mortbay.jetty + servlet-api-2.5 + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-json + + + com.sun.jersey + jersey-server + + + org.mortbay.jetty + jetty + + + org.mortbay.jetty + jetty-util + + + tomcat + jasper-runtime + + + tomcat + jasper-compiler + + + org.jruby + jruby-complete + + + org.jboss.netty + netty + + + io.netty + netty + + + + + org.apache.hbase + hbase-server + ${project.version} + + + org.apache.hbase + hbase-server + ${project.version} + test + test-jar + + + org.apache.hbase + hbase-it + ${project.version} + test-jar + test + + + + + + src/test/scala + + + org.apache.maven.plugins + maven-compiler-plugin + + + + net.alchim31.maven + scala-maven-plugin + 3.2.0 + + ${project.build.sourceEncoding} + ${scala.version} + + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + + org.scalatest + scalatest-maven-plugin + 1.0 + + ${project.build.directory}/surefire-reports + . + WDF TestSuite.txt + false + + + + test + test + + test + + + true + + + + integration-test + integration-test + + test + + + Integration-Test + + -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m + + false + + + + + + + + \ No newline at end of file diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java new file mode 100644 index 00000000000..68b2edd6f2e --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +import java.util.ArrayList; +import java.util.List; + +/** + * This is a simple example of deleting records in HBase + * with the bulkDelete function. + */ +final public class JavaHBaseBulkDeleteExample { + + private JavaHBaseBulkDeleteExample() {} + + public static void main(String[] args) { + if (args.length < 1) { + System.out.println("JavaHBaseBulkDeleteExample {tableName}"); + return; + } + + String tableName = args[0]; + + SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkDeleteExample " + tableName); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + + try { + List list = new ArrayList<>(); + list.add(Bytes.toBytes("1")); + list.add(Bytes.toBytes("2")); + list.add(Bytes.toBytes("3")); + list.add(Bytes.toBytes("4")); + list.add(Bytes.toBytes("5")); + + JavaRDD rdd = jsc.parallelize(list); + + Configuration conf = HBaseConfiguration.create(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.bulkDelete(rdd, + TableName.valueOf(tableName), new DeleteFunction(), 4); + } finally { + jsc.stop(); + } + + } + + public static class DeleteFunction implements Function { + private static final long serialVersionUID = 1L; + public Delete call(byte[] v) throws Exception { + return new Delete(v); + } + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java new file mode 100644 index 00000000000..c7dcbb6a0db --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +/** + * This is a simple example of getting records in HBase + * with the bulkGet function. + */ +final public class JavaHBaseBulkGetExample { + + private JavaHBaseBulkGetExample() {} + + public static void main(String[] args) { + if (args.length < 1) { + System.out.println("JavaHBaseBulkGetExample {tableName}"); + return; + } + + String tableName = args[0]; + + SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample " + tableName); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + + try { + List list = new ArrayList<>(); + list.add(Bytes.toBytes("1")); + list.add(Bytes.toBytes("2")); + list.add(Bytes.toBytes("3")); + list.add(Bytes.toBytes("4")); + list.add(Bytes.toBytes("5")); + + JavaRDD rdd = jsc.parallelize(list); + + Configuration conf = HBaseConfiguration.create(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd, new GetFunction(), + new ResultFunction()); + } finally { + jsc.stop(); + } + } + + public static class GetFunction implements Function { + + private static final long serialVersionUID = 1L; + + public Get call(byte[] v) throws Exception { + return new Get(v); + } + } + + public static class ResultFunction implements Function { + + private static final long serialVersionUID = 1L; + + public String call(Result result) throws Exception { + Iterator it = result.listCells().iterator(); + StringBuilder b = new StringBuilder(); + + b.append(Bytes.toString(result.getRow())).append(":"); + + while (it.hasNext()) { + Cell cell = it.next(); + String q = Bytes.toString(cell.getQualifierArray()); + if (q.equals("counter")) { + b.append("(") + .append(Bytes.toString(cell.getQualifierArray())) + .append(",") + .append(Bytes.toLong(cell.getValueArray())) + .append(")"); + } else { + b.append("(") + .append(Bytes.toString(cell.getQualifierArray())) + .append(",") + .append(Bytes.toString(cell.getValueArray())) + .append(")"); + } + } + return b.toString(); + } + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java new file mode 100644 index 00000000000..ded508123d0 --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +/** + * This is a simple example of putting records in HBase + * with the bulkPut function. + */ +final public class JavaHBaseBulkPutExample { + + private JavaHBaseBulkPutExample() {} + + public static void main(String[] args) { + if (args.length < 2) { + System.out.println("JavaHBaseBulkPutExample " + + "{tableName} {columnFamily}"); + return; + } + + String tableName = args[0]; + String columnFamily = args[1]; + + SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkPutExample " + tableName); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + + try { + List list = new ArrayList<>(); + list.add("1," + columnFamily + ",a,1"); + list.add("2," + columnFamily + ",a,2"); + list.add("3," + columnFamily + ",a,3"); + list.add("4," + columnFamily + ",a,4"); + list.add("5," + columnFamily + ",a,5"); + + JavaRDD rdd = jsc.parallelize(list); + + Configuration conf = HBaseConfiguration.create(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.bulkPut(rdd, + TableName.valueOf(tableName), + new PutFunction()); + } finally { + jsc.stop(); + } + } + + public static class PutFunction implements Function { + + private static final long serialVersionUID = 1L; + + public Put call(String v) throws Exception { + String[] cells = v.split(","); + Put put = new Put(Bytes.toBytes(cells[0])); + + put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]), + Bytes.toBytes(cells[3])); + return put; + } + + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java new file mode 100644 index 00000000000..6192ad9ba47 --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import org.apache.spark.api.java.function.Function; +import scala.Tuple2; + +/** + * This is a simple example of scanning records from HBase + * with the hbaseRDD function. + */ +final public class JavaHBaseDistributedScan { + + private JavaHBaseDistributedScan() {} + + public static void main(String[] args) { + if (args.length < 1) { + System.out.println("JavaHBaseDistributedScan {tableName}"); + return; + } + + String tableName = args[0]; + + SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseDistributedScan " + tableName); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + + try { + Configuration conf = HBaseConfiguration.create(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + Scan scan = new Scan(); + scan.setCaching(100); + + JavaRDD> javaRdd = + hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan); + + List results = javaRdd.map(new ScanConvertFunction()).collect(); + + System.out.println("Result Size: " + results.size()); + } finally { + jsc.stop(); + } + } + + private static class ScanConvertFunction implements + Function, String> { + @Override + public String call(Tuple2 v1) throws Exception { + return Bytes.toString(v1._1().copyBytes()); + } + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java new file mode 100644 index 00000000000..0d41a704ad7 --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.VoidFunction; + +import scala.Tuple2; + +/** + * This is a simple example of using the foreachPartition + * method with a HBase connection + */ +final public class JavaHBaseMapGetPutExample { + + private JavaHBaseMapGetPutExample() {} + + public static void main(String[] args) { + if (args.length < 1) { + System.out.println("JavaHBaseBulkGetExample {tableName}"); + return; + } + + final String tableName = args[0]; + + SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample " + tableName); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + + try { + List list = new ArrayList<>(); + list.add(Bytes.toBytes("1")); + list.add(Bytes.toBytes("2")); + list.add(Bytes.toBytes("3")); + list.add(Bytes.toBytes("4")); + list.add(Bytes.toBytes("5")); + + JavaRDD rdd = jsc.parallelize(list); + Configuration conf = HBaseConfiguration.create(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.foreachPartition(rdd, + new VoidFunction, Connection>>() { + public void call(Tuple2, Connection> t) + throws Exception { + Table table = t._2().getTable(TableName.valueOf(tableName)); + BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName)); + + while (t._1().hasNext()) { + byte[] b = t._1().next(); + Result r = table.get(new Get(b)); + if (r.getExists()) { + mutator.mutate(new Put(b)); + } + } + + mutator.flush(); + mutator.close(); + table.close(); + } + }); + } finally { + jsc.stop(); + } + } + + public static class GetFunction implements Function { + private static final long serialVersionUID = 1L; + public Get call(byte[] v) throws Exception { + return new Get(v); + } + } +} diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java new file mode 100644 index 00000000000..cd4cf24f15e --- /dev/null +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.spark.JavaHBaseContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + +/** + * This is a simple example of BulkPut with Spark Streaming + */ +final public class JavaHBaseStreamingBulkPutExample { + + private JavaHBaseStreamingBulkPutExample() {} + + public static void main(String[] args) { + if (args.length < 4) { + System.out.println("JavaHBaseBulkPutExample " + + "{host} {port} {tableName}"); + return; + } + + String host = args[0]; + String port = args[1]; + String tableName = args[2]; + + SparkConf sparkConf = + new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " + + tableName + ":" + port + ":" + tableName); + + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + + try { + JavaStreamingContext jssc = + new JavaStreamingContext(jsc, new Duration(1000)); + + JavaReceiverInputDStream javaDstream = + jssc.socketTextStream(host, Integer.parseInt(port)); + + Configuration conf = HBaseConfiguration.create(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.streamBulkPut(javaDstream, + TableName.valueOf(tableName), + new PutFunction()); + } finally { + jsc.stop(); + } + } + + public static class PutFunction implements Function { + + private static final long serialVersionUID = 1L; + + public Put call(String v) throws Exception { + String[] part = v.split(","); + Put put = new Put(Bytes.toBytes(part[0])); + + put.addColumn(Bytes.toBytes(part[1]), + Bytes.toBytes(part[2]), + Bytes.toBytes(part[3])); + return put; + } + + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala new file mode 100644 index 00000000000..f060fea102d --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -0,0 +1,570 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.TableName +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.rdd.RDD +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.client.ConnectionFactory +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.client.Get +import org.apache.hadoop.hbase.client.Result +import scala.reflect.ClassTag +import org.apache.hadoop.hbase.client.Connection +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.client.Delete +import org.apache.spark.{Logging, SerializableWritable, SparkContext} +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.hbase.client.Mutation +import org.apache.spark.streaming.dstream.DStream +import java.io._ +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod +import org.apache.hadoop.hbase.mapreduce.TableInputFormat +import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper +import org.apache.hadoop.fs.{Path, FileSystem} + +/** + * HBaseContext is a façade for HBase operations + * like bulk put, get, increment, delete, and scan + * + * HBaseContext will take the responsibilities + * of disseminating the configuration information + * to the working and managing the life cycle of HConnections. + */ +class HBaseContext(@transient sc: SparkContext, + @transient config: Configuration, + val tmpHdfsConfgFile: String = null) + extends Serializable with Logging { + + @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials() + @transient var tmpHdfsConfiguration:Configuration = config + @transient var appliedCredentials = false + @transient val job = Job.getInstance(config) + TableMapReduceUtil.initCredentials(job) + val broadcastedConf = sc.broadcast(new SerializableWritable(config)) + val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials)) + + if (tmpHdfsConfgFile != null && config != null) { + val fs = FileSystem.newInstance(config) + val tmpPath = new Path(tmpHdfsConfgFile) + if (!fs.exists(tmpPath)) { + val outputStream = fs.create(tmpPath) + config.write(outputStream) + outputStream.close() + } else { + logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!") + } + } + + /** + * A simple enrichment of the traditional Spark RDD foreachPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param rdd Original RDD with data to iterate over + * @param f Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + */ + def foreachPartition[T](rdd: RDD[T], + f: (Iterator[T], Connection) => Unit):Unit = { + rdd.foreachPartition( + it => hbaseForeachPartition(broadcastedConf, it, f)) + } + + /** + * A simple enrichment of the traditional Spark Streaming dStream foreach + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param dstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + */ + def foreachPartition[T](dstream: DStream[T], + f: (Iterator[T], Connection) => Unit):Unit = { + dstream.foreachRDD((rdd, time) => { + foreachPartition(rdd, f) + }) + } + + /** + * A simple enrichment of the traditional Spark RDD mapPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param rdd Original RDD with data to iterate over + * @param mp Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + * @return Returns a new RDD generated by the user definition + * function just like normal mapPartition + */ + def mapPartitions[T, R: ClassTag](rdd: RDD[T], + mp: (Iterator[T], Connection) => Iterator[R]): RDD[R] = { + + rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf, + it, + mp)) + + } + + /** + * A simple enrichment of the traditional Spark Streaming DStream + * foreachPartition. + * + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param dstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + * @return Returns a new DStream generated by the user + * definition function just like normal mapPartition + */ + def streamForeachPartition[T](dstream: DStream[T], + f: (Iterator[T], Connection) => Unit): Unit = { + + dstream.foreachRDD(rdd => this.foreachPartition(rdd, f)) + } + + /** + * A simple enrichment of the traditional Spark Streaming DStream + * mapPartition. + * + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param dstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + * @return Returns a new DStream generated by the user + * definition function just like normal mapPartition + */ + def streamMapPartitions[T, U: ClassTag](dstream: DStream[T], + f: (Iterator[T], Connection) => Iterator[U]): + DStream[U] = { + dstream.mapPartitions(it => hbaseMapPartition[T, U]( + broadcastedConf, + it, + f)) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take RDD + * and generate puts and send them to HBase. + * The complexity of managing the HConnection is + * removed from the developer + * + * @param rdd Original RDD with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in the RDD to a HBase Put + */ + def bulkPut[T](rdd: RDD[T], tableName: TableName, f: (T) => Put) { + + val tName = tableName.getName + rdd.foreachPartition( + it => hbaseForeachPartition[T]( + broadcastedConf, + it, + (iterator, connection) => { + val m = connection.getBufferedMutator(TableName.valueOf(tName)) + iterator.foreach(T => m.mutate(f(T))) + m.flush() + m.close() + })) + } + + def applyCreds[T] (configBroadcast: Broadcast[SerializableWritable[Configuration]]){ + credentials = SparkHadoopUtil.get.getCurrentUserCredentials() + + logDebug("appliedCredentials:" + appliedCredentials + ",credentials:" + credentials) + + if (!appliedCredentials && credentials != null) { + appliedCredentials = true + + @transient val ugi = UserGroupInformation.getCurrentUser + ugi.addCredentials(credentials) + // specify that this is a proxy user + ugi.setAuthenticationMethod(AuthenticationMethod.PROXY) + + ugi.addCredentials(credentialsConf.value.value) + } + } + + /** + * A simple abstraction over the HBaseContext.streamMapPartition method. + * + * It allow addition support for a user to take a DStream and + * generate puts and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param dstream Original DStream with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in + * the DStream to a HBase Put + */ + def streamBulkPut[T](dstream: DStream[T], + tableName: TableName, + f: (T) => Put) = { + val tName = tableName.getName + dstream.foreachRDD((rdd, time) => { + bulkPut(rdd, TableName.valueOf(tName), f) + }) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take a RDD and generate delete + * and send them to HBase. The complexity of managing the HConnection is + * removed from the developer + * + * @param rdd Original RDD with data to iterate over + * @param tableName The name of the table to delete from + * @param f Function to convert a value in the RDD to a + * HBase Deletes + * @param batchSize The number of delete to batch before sending to HBase + */ + def bulkDelete[T](rdd: RDD[T], tableName: TableName, + f: (T) => Delete, batchSize: Integer) { + bulkMutation(rdd, tableName, f, batchSize) + } + + /** + * A simple abstraction over the HBaseContext.streamBulkMutation method. + * + * It allow addition support for a user to take a DStream and + * generate Delete and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param dstream Original DStream with data to iterate over + * @param tableName The name of the table to delete from + * @param f function to convert a value in the DStream to a + * HBase Delete + * @param batchSize The number of deletes to batch before sending to HBase + */ + def streamBulkDelete[T](dstream: DStream[T], + tableName: TableName, + f: (T) => Delete, + batchSize: Integer) = { + streamBulkMutation(dstream, tableName, f, batchSize) + } + + /** + * Under lining function to support all bulk mutations + * + * May be opened up if requested + */ + private def bulkMutation[T](rdd: RDD[T], tableName: TableName, + f: (T) => Mutation, batchSize: Integer) { + + val tName = tableName.getName + rdd.foreachPartition( + it => hbaseForeachPartition[T]( + broadcastedConf, + it, + (iterator, connection) => { + val table = connection.getTable(TableName.valueOf(tName)) + val mutationList = new java.util.ArrayList[Mutation] + iterator.foreach(T => { + mutationList.add(f(T)) + if (mutationList.size >= batchSize) { + table.batch(mutationList, null) + mutationList.clear() + } + }) + if (mutationList.size() > 0) { + table.batch(mutationList, null) + mutationList.clear() + } + table.close() + })) + } + + /** + * Under lining function to support all bulk streaming mutations + * + * May be opened up if requested + */ + private def streamBulkMutation[T](dstream: DStream[T], + tableName: TableName, + f: (T) => Mutation, + batchSize: Integer) = { + val tName = tableName.getName + dstream.foreachRDD((rdd, time) => { + bulkMutation(rdd, TableName.valueOf(tName), f, batchSize) + }) + } + + /** + * A simple abstraction over the HBaseContext.mapPartition method. + * + * It allow addition support for a user to take a RDD and generates a + * new RDD based on Gets and the results they bring back from HBase + * + * @param rdd Original RDD with data to iterate over + * @param tableName The name of the table to get from + * @param makeGet function to convert a value in the RDD to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * RDD + * return new RDD that is created by the Get to HBase + */ + def bulkGet[T, U: ClassTag](tableName: TableName, + batchSize: Integer, + rdd: RDD[T], + makeGet: (T) => Get, + convertResult: (Result) => U): RDD[U] = { + + val getMapPartition = new GetMapPartition(tableName, + batchSize, + makeGet, + convertResult) + + rdd.mapPartitions[U](it => + hbaseMapPartition[T, U]( + broadcastedConf, + it, + getMapPartition.run)) + } + + /** + * A simple abstraction over the HBaseContext.streamMap method. + * + * It allow addition support for a user to take a DStream and + * generates a new DStream based on Gets and the results + * they bring back from HBase + * + * @param tableName The name of the table to get from + * @param batchSize The number of Gets to be sent in a single batch + * @param dStream Original DStream with data to iterate over + * @param makeGet Function to convert a value in the DStream to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * DStream + * @return A new DStream that is created by the Get to HBase + */ + def streamBulkGet[T, U: ClassTag](tableName: TableName, + batchSize: Integer, + dStream: DStream[T], + makeGet: (T) => Get, + convertResult: (Result) => U): DStream[U] = { + + val getMapPartition = new GetMapPartition(tableName, + batchSize, + makeGet, + convertResult) + + dStream.mapPartitions[U](it => hbaseMapPartition[T, U]( + broadcastedConf, + it, + getMapPartition.run)) + } + + /** + * This function will use the native HBase TableInputFormat with the + * given scan object to generate a new RDD + * + * @param tableName the name of the table to scan + * @param scan the HBase scan object to use to read data from HBase + * @param f function to convert a Result object from HBase into + * what the user wants in the final generated RDD + * @return new RDD with results from scan + */ + def hbaseRDD[U: ClassTag](tableName: TableName, scan: Scan, + f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = { + + val job: Job = Job.getInstance(getConf(broadcastedConf)) + + TableMapReduceUtil.initCredentials(job) + TableMapReduceUtil.initTableMapperJob(tableName, scan, + classOf[IdentityTableMapper], null, null, job) + + sc.newAPIHadoopRDD(job.getConfiguration, + classOf[TableInputFormat], + classOf[ImmutableBytesWritable], + classOf[Result]).map(f) + } + + /** + * A overloaded version of HBaseContext hbaseRDD that defines the + * type of the resulting RDD + * + * @param tableName the name of the table to scan + * @param scans the HBase scan object to use to read data from HBase + * @return New RDD with results from scan + * + */ + def hbaseRDD(tableName: TableName, scans: Scan): + RDD[(ImmutableBytesWritable, Result)] = { + + hbaseRDD[(ImmutableBytesWritable, Result)]( + tableName, + scans, + (r: (ImmutableBytesWritable, Result)) => r) + } + + /** + * underlining wrapper all foreach functions in HBaseContext + */ + private def hbaseForeachPartition[T](configBroadcast: + Broadcast[SerializableWritable[Configuration]], + it: Iterator[T], + f: (Iterator[T], Connection) => Unit) = { + + val config = getConf(configBroadcast) + + applyCreds(configBroadcast) + // specify that this is a proxy user + val connection = ConnectionFactory.createConnection(config) + f(it, connection) + connection.close() + } + + private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]): + Configuration = { + + if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) { + val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf) + val inputStream = fs.open(new Path(tmpHdfsConfgFile)) + tmpHdfsConfiguration = new Configuration(false) + tmpHdfsConfiguration.readFields(inputStream) + inputStream.close() + } + + if (tmpHdfsConfiguration == null) { + try { + tmpHdfsConfiguration = configBroadcast.value.value + } catch { + case ex: Exception => logError("Unable to getConfig from broadcast", ex) + } + } + tmpHdfsConfiguration + } + + /** + * underlining wrapper all mapPartition functions in HBaseContext + * + */ + private def hbaseMapPartition[K, U]( + configBroadcast: + Broadcast[SerializableWritable[Configuration]], + it: Iterator[K], + mp: (Iterator[K], Connection) => + Iterator[U]): Iterator[U] = { + + val config = getConf(configBroadcast) + applyCreds(configBroadcast) + + val connection = ConnectionFactory.createConnection(config) + val res = mp(it, connection) + connection.close() + res + + } + + /** + * underlining wrapper all get mapPartition functions in HBaseContext + */ + private class GetMapPartition[T, U](tableName: TableName, + batchSize: Integer, + makeGet: (T) => Get, + convertResult: (Result) => U) + extends Serializable { + + val tName = tableName.getName + + def run(iterator: Iterator[T], connection: Connection): Iterator[U] = { + val table = connection.getTable(TableName.valueOf(tName)) + + val gets = new java.util.ArrayList[Get]() + var res = List[U]() + + while (iterator.hasNext) { + gets.add(makeGet(iterator.next())) + + if (gets.size() == batchSize) { + val results = table.get(gets) + res = res ++ results.map(convertResult) + gets.clear() + } + } + if (gets.size() > 0) { + val results = table.get(gets) + res = res ++ results.map(convertResult) + gets.clear() + } + table.close() + res.iterator + } + } + + /** + * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef]. + * + * This method is used to keep ClassTags out of the external Java API, as + * the Java compiler cannot produce them automatically. While this + * ClassTag-faking does please the compiler, it can cause problems at runtime + * if the Scala API relies on ClassTags for correctness. + * + * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior, + * just worse performance or security issues. + * For instance, an Array of AnyRef can hold any type T, but may lose primitive + * specialization. + */ + private[spark] + def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala new file mode 100644 index 00000000000..d563a29fa13 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.spark.streaming.dstream.DStream + +import scala.reflect.ClassTag + +/** + * HBaseDStreamFunctions contains a set of implicit functions that can be + * applied to a Spark DStream so that we can easily interact with HBase + */ +object HBaseDStreamFunctions { + + /** + * These are implicit methods for a DStream that contains any type of + * data. + * + * @param dStream This is for dStreams of any type + * @tparam T Type T + */ + implicit class GenericHBaseDStreamFunctions[T](val dStream: DStream[T]) { + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * put. This will not return a new Stream. Think of it like a foreach + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param f The function that will turn the DStream values + * into HBase Put objects. + */ + def hbaseBulkPut(hc: HBaseContext, + tableName: TableName, + f: (T) => Put): Unit = { + hc.streamBulkPut(dStream, tableName, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * get. This will return a new DStream. Think about it as a DStream map + * function. In that every DStream value will get a new value out of + * HBase. That new value will populate the newly generated DStream. + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param batchSize How many gets to execute in a single batch + * @param f The function that will turn the RDD values + * in HBase Get objects + * @param convertResult The function that will convert a HBase + * Result object into a value that will go + * into the resulting DStream + * @tparam R The type of Object that will be coming + * out of the resulting DStream + * @return A resulting DStream with type R objects + */ + def hbaseBulkGet[R: ClassTag](hc: HBaseContext, + tableName: TableName, + batchSize:Int, f: (T) => Get, convertResult: (Result) => R): + DStream[R] = { + hc.streamBulkGet[T, R](tableName, batchSize, dStream, f, convertResult) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * get. This will return a new DStream. Think about it as a DStream map + * function. In that every DStream value will get a new value out of + * HBase. That new value will populate the newly generated DStream. + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param batchSize How many gets to execute in a single batch + * @param f The function that will turn the RDD values + * in HBase Get objects + * @return A resulting DStream with type R objects + */ + def hbaseBulkGet(hc: HBaseContext, + tableName: TableName, batchSize:Int, + f: (T) => Get): DStream[(ImmutableBytesWritable, Result)] = { + hc.streamBulkGet[T, (ImmutableBytesWritable, Result)]( + tableName, batchSize, dStream, f, + result => (new ImmutableBytesWritable(result.getRow), result)) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * Delete. This will not return a new DStream. + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param tableName The tableName that the deletes will be sent to + * @param f The function that will convert the DStream value into + * a HBase Delete Object + * @param batchSize The number of Deletes to be sent in a single batch + */ + def hbaseBulkDelete(hc: HBaseContext, + tableName: TableName, + f:(T) => Delete, batchSize:Int): Unit = { + hc.streamBulkDelete(dStream, tableName, f, batchSize) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * foreachPartition method. This will ack very much like a normal DStream + * foreach method but for the fact that you will now have a HBase connection + * while iterating through the values. + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param f This function will get an iterator for a Partition of an + * DStream along with a connection object to HBase + */ + def hbaseForeachPartition(hc: HBaseContext, + f: (Iterator[T], Connection) => Unit): Unit = { + hc.streamForeachPartition(dStream, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * mapPartitions method. This will ask very much like a normal DStream + * map partitions method but for the fact that you will now have a + * HBase connection while iterating through the values + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param f This function will get an iterator for a Partition of an + * DStream along with a connection object to HBase + * @tparam R This is the type of objects that will go into the resulting + * DStream + * @return A resulting DStream of type R + */ + def hbaseMapPartitions[R: ClassTag](hc: HBaseContext, + f: (Iterator[T], Connection) => Iterator[R]): + DStream[R] = { + hc.streamMapPartitions(dStream, f) + } + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala new file mode 100644 index 00000000000..fb8456d0bb5 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.spark.rdd.RDD + +import scala.reflect.ClassTag + +/** + * HBaseRDDFunctions contains a set of implicit functions that can be + * applied to a Spark RDD so that we can easily interact with HBase + */ +object HBaseRDDFunctions +{ + + /** + * These are implicit methods for a RDD that contains any type of + * data. + * + * @param rdd This is for rdd of any type + * @tparam T This is any type + */ + implicit class GenericHBaseRDDFunctions[T](val rdd: RDD[T]) { + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * put. This will not return a new RDD. Think of it like a foreach + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param f The function that will turn the RDD values + * into HBase Put objects. + */ + def hbaseBulkPut(hc: HBaseContext, + tableName: TableName, + f: (T) => Put): Unit = { + hc.bulkPut(rdd, tableName, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * get. This will return a new RDD. Think about it as a RDD map + * function. In that every RDD value will get a new value out of + * HBase. That new value will populate the newly generated RDD. + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param batchSize How many gets to execute in a single batch + * @param f The function that will turn the RDD values + * in HBase Get objects + * @param convertResult The function that will convert a HBase + * Result object into a value that will go + * into the resulting RDD + * @tparam R The type of Object that will be coming + * out of the resulting RDD + * @return A resulting RDD with type R objects + */ + def hbaseBulkGet[R: ClassTag](hc: HBaseContext, + tableName: TableName, batchSize:Int, + f: (T) => Get, convertResult: (Result) => R): RDD[R] = { + hc.bulkGet[T, R](tableName, batchSize, rdd, f, convertResult) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * get. This will return a new RDD. Think about it as a RDD map + * function. In that every RDD value will get a new value out of + * HBase. That new value will populate the newly generated RDD. + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param batchSize How many gets to execute in a single batch + * @param f The function that will turn the RDD values + * in HBase Get objects + * @return A resulting RDD with type R objects + */ + def hbaseBulkGet(hc: HBaseContext, + tableName: TableName, batchSize:Int, + f: (T) => Get): RDD[(ImmutableBytesWritable, Result)] = { + hc.bulkGet[T, (ImmutableBytesWritable, Result)](tableName, + batchSize, rdd, f, + result => if (result != null && result.getRow != null) { + (new ImmutableBytesWritable(result.getRow), result) + } else { + null + }) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * Delete. This will not return a new RDD. + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param tableName The tableName that the deletes will be sent to + * @param f The function that will convert the RDD value into + * a HBase Delete Object + * @param batchSize The number of Deletes to be sent in a single batch + */ + def hbaseBulkDelete(hc: HBaseContext, + tableName: TableName, f:(T) => Delete, batchSize:Int): Unit = { + hc.bulkDelete(rdd, tableName, f, batchSize) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * foreachPartition method. This will ack very much like a normal RDD + * foreach method but for the fact that you will now have a HBase connection + * while iterating through the values. + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param f This function will get an iterator for a Partition of an + * RDD along with a connection object to HBase + */ + def hbaseForeachPartition(hc: HBaseContext, + f: (Iterator[T], Connection) => Unit): Unit = { + hc.foreachPartition(rdd, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * mapPartitions method. This will ask very much like a normal RDD + * map partitions method but for the fact that you will now have a + * HBase connection while iterating through the values + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param f This function will get an iterator for a Partition of an + * RDD along with a connection object to HBase + * @tparam R This is the type of objects that will go into the resulting + * RDD + * @return A resulting RDD of type R + */ + def hbaseMapPartitions[R: ClassTag](hc: HBaseContext, + f: (Iterator[T], Connection) => Iterator[R]): + RDD[R] = { + hc.mapPartitions[T,R](rdd, f) + } + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala new file mode 100644 index 00000000000..48398923cc9 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.TableName +import org.apache.spark.api.java.JavaSparkContext +import org.apache.hadoop.conf.Configuration +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.function.VoidFunction +import org.apache.spark.api.java.function.Function +import org.apache.hadoop.hbase.client.Connection +import org.apache.spark.streaming.api.java.JavaDStream +import org.apache.spark.api.java.function.FlatMapFunction +import scala.collection.JavaConversions._ +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.client.Delete +import org.apache.hadoop.hbase.client.Get +import org.apache.hadoop.hbase.client.Result +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import scala.reflect.ClassTag + +/** + * This is the Java Wrapper over HBaseContext which is written in + * Scala. This class will be used by developers that want to + * work with Spark or Spark Streaming in Java + * + * @param jsc This is the JavaSparkContext that we will wrap + * @param config This is the config information to out HBase cluster + */ +class JavaHBaseContext(@transient jsc: JavaSparkContext, + @transient config: Configuration) extends Serializable { + val hbaseContext = new HBaseContext(jsc.sc, config) + + /** + * A simple enrichment of the traditional Spark javaRdd foreachPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param javaRdd Original javaRdd with data to iterate over + * @param f Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + */ + def foreachPartition[T](javaRdd: JavaRDD[T], + f: VoidFunction[(java.util.Iterator[T], Connection)] ) = { + + hbaseContext.foreachPartition(javaRdd.rdd, + (it:Iterator[T], conn:Connection) => + { f.call((it, conn)) }) + } + + /** + * A simple enrichment of the traditional Spark Streaming dStream foreach + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param javaDstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the JavaDStream values and a HConnection object to + * interact with HBase + */ + def foreachPartition[T](javaDstream: JavaDStream[T], + f: VoidFunction[(Iterator[T], Connection)]) = { + hbaseContext.foreachPartition(javaDstream.dstream, + (it:Iterator[T], conn: Connection) => f.call(it, conn)) + } + + /** + * A simple enrichment of the traditional Spark JavaRDD mapPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param javaRdd Original JavaRdd with data to iterate over + * @param f Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + * @return Returns a new RDD generated by the user definition + * function just like normal mapPartition + */ + def mapPartitions[T,R](javaRdd: JavaRDD[T], + f: FlatMapFunction[(java.util.Iterator[T], + Connection),R] ): JavaRDD[R] = { + + def fn = (it: Iterator[T], conn: Connection) => + asScalaIterator( + f.call((asJavaIterator(it), conn)).iterator() + ) + + JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd, + (iterator:Iterator[T], connection:Connection) => + fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R]) + } + + /** + * A simple enrichment of the traditional Spark Streaming JavaDStream + * mapPartition. + * + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param javaDstream Original JavaDStream with data to iterate over + * @param mp Function to be given a iterator to iterate through + * the JavaDStream values and a HConnection object to + * interact with HBase + * @return Returns a new JavaDStream generated by the user + * definition function just like normal mapPartition + */ + def streamMap[T, U](javaDstream: JavaDStream[T], + mp: Function[(Iterator[T], Connection), Iterator[U]]): + JavaDStream[U] = { + JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream, + (it: Iterator[T], conn: Connection) => + mp.call(it, conn) )(fakeClassTag[U]))(fakeClassTag[U]) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take JavaRDD + * and generate puts and send them to HBase. + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaDdd Original JavaRDD with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in the JavaRDD + * to a HBase Put + */ + def bulkPut[T](javaDdd: JavaRDD[T], + tableName: TableName, + f: Function[(T), Put]) { + + hbaseContext.bulkPut(javaDdd.rdd, tableName, (t:T) => f.call(t)) + } + + /** + * A simple abstraction over the HBaseContext.streamMapPartition method. + * + * It allow addition support for a user to take a JavaDStream and + * generate puts and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaDstream Original DStream with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in + * the JavaDStream to a HBase Put + */ + def streamBulkPut[T](javaDstream: JavaDStream[T], + tableName: TableName, + f: Function[T,Put]) = { + hbaseContext.streamBulkPut(javaDstream.dstream, + tableName, + (t:T) => f.call(t)) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take a JavaRDD and + * generate delete and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaRdd Original JavaRDD with data to iterate over + * @param tableName The name of the table to delete from + * @param f Function to convert a value in the JavaRDD to a + * HBase Deletes + * @param batchSize The number of deletes to batch before sending to HBase + */ + def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName, + f: Function[T, Delete], batchSize:Integer) { + hbaseContext.bulkDelete(javaRdd.rdd, tableName, (t:T) => f.call(t), batchSize) + } + + /** + * A simple abstraction over the HBaseContext.streamBulkMutation method. + * + * It allow addition support for a user to take a JavaDStream and + * generate Delete and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaDstream Original DStream with data to iterate over + * @param tableName The name of the table to delete from + * @param f Function to convert a value in the JavaDStream to a + * HBase Delete + * @param batchSize The number of deletes to be sent at once + */ + def streamBulkDelete[T](javaDstream: JavaDStream[T], + tableName: TableName, + f: Function[T, Delete], + batchSize: Integer) = { + hbaseContext.streamBulkDelete(javaDstream.dstream, tableName, + (t:T) => f.call(t), + batchSize) + } + + /** + * A simple abstraction over the HBaseContext.mapPartition method. + * + * It allow addition support for a user to take a JavaRDD and generates a + * new RDD based on Gets and the results they bring back from HBase + * + * @param tableName The name of the table to get from + * @param batchSize batch size of how many gets to retrieve in a single fetch + * @param javaRdd Original JavaRDD with data to iterate over + * @param makeGet Function to convert a value in the JavaRDD to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * JavaRDD + * return new JavaRDD that is created by the Get to HBase + */ + def bulkGet[T, U](tableName: TableName, + batchSize:Integer, + javaRdd: JavaRDD[T], + makeGet: Function[T, Get], + convertResult: Function[Result, U]): JavaRDD[U] = { + + JavaRDD.fromRDD(hbaseContext.bulkGet[T, U](tableName, + batchSize, + javaRdd.rdd, + (t:T) => makeGet.call(t), + (r:Result) => {convertResult.call(r)})(fakeClassTag[U]))(fakeClassTag[U]) + + } + + /** + * A simple abstraction over the HBaseContext.streamMap method. + * + * It allow addition support for a user to take a DStream and + * generates a new DStream based on Gets and the results + * they bring back from HBase + * + + * @param tableName The name of the table to get from + * @param batchSize The number of gets to be batched together + * @param javaDStream Original DStream with data to iterate over + * @param makeGet Function to convert a value in the JavaDStream to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * JavaDStream + * return new JavaDStream that is created by the Get to HBase + */ + def streamBulkGet[T, U](tableName:TableName, + batchSize:Integer, + javaDStream: JavaDStream[T], + makeGet: Function[T, Get], + convertResult: Function[Result, U]) { + JavaDStream.fromDStream(hbaseContext.streamBulkGet(tableName, + batchSize, + javaDStream.dstream, + (t:T) => makeGet.call(t), + (r:Result) => convertResult.call(r) )(fakeClassTag[U]))(fakeClassTag[U]) + } + + /** + * This function will use the native HBase TableInputFormat with the + * given scan object to generate a new JavaRDD + * + * @param tableName the name of the table to scan + * @param scans the HBase scan object to use to read data from HBase + * @param f function to convert a Result object from HBase into + * what the user wants in the final generated JavaRDD + * @return new JavaRDD with results from scan + */ + def hbaseRDD[U](tableName: TableName, + scans: Scan, + f: Function[(ImmutableBytesWritable, Result), U]): + JavaRDD[U] = { + JavaRDD.fromRDD( + hbaseContext.hbaseRDD[U](tableName, + scans, + (v:(ImmutableBytesWritable, Result)) => + f.call(v._1, v._2))(fakeClassTag[U]))(fakeClassTag[U]) + } + + /** + * A overloaded version of HBaseContext hbaseRDD that define the + * type of the resulting JavaRDD + * + * @param tableName the name of the table to scan + * @param scans the HBase scan object to use to read data from HBase + * @return New JavaRDD with results from scan + * + */ + def hbaseRDD(tableName: TableName, + scans: Scan): + JavaRDD[(ImmutableBytesWritable, Result)] = { + JavaRDD.fromRDD(hbaseContext.hbaseRDD(tableName, scans)) + } + + /** + * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef]. + * + * This method is used to keep ClassTags out of the external Java API, as the Java compiler + * cannot produce them automatically. While this ClassTag-faking does please the compiler, + * it can cause problems at runtime if the Scala API relies on ClassTags for correctness. + * + * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior, + * just worse performance or security issues. + * For instance, an Array[AnyRef] can hold any type T, + * but may lose primitive + * specialization. + */ + private[spark] + def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala new file mode 100644 index 00000000000..f77721f147b --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.hbasecontext + +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Delete +import org.apache.spark.SparkConf + +/** + * This is a simple example of deleting records in HBase + * with the bulkDelete function. + */ +object HBaseBulkDeleteExample { + def main(args: Array[String]) { + if (args.length < 1) { + println("HBaseBulkDeletesExample {tableName} ") + return + } + + val tableName = args(0) + + val sparkConf = new SparkConf().setAppName("HBaseBulkDeleteExample " + tableName) + val sc = new SparkContext(sparkConf) + try { + //[Array[Byte]] + val rdd = sc.parallelize(Array( + Bytes.toBytes("1"), + Bytes.toBytes("2"), + Bytes.toBytes("3"), + Bytes.toBytes("4"), + Bytes.toBytes("5") + )) + + val conf = HBaseConfiguration.create() + + val hbaseContext = new HBaseContext(sc, conf) + hbaseContext.bulkDelete[Array[Byte]](rdd, + TableName.valueOf(tableName), + putRecord => new Delete(putRecord), + 4) + } finally { + sc.stop() + } + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala new file mode 100644 index 00000000000..88f52fbbc9c --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.hbasecontext + +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseConfiguration} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Get +import org.apache.hadoop.hbase.client.Result +import org.apache.spark.SparkConf + +/** + * This is a simple example of getting records in HBase + * with the bulkGet function. + */ +object HBaseBulkGetExample { + def main(args: Array[String]) { + if (args.length < 1) { + println("HBaseBulkGetExample {tableName}") + return + } + + val tableName = args(0) + + val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName) + val sc = new SparkContext(sparkConf) + + try { + + //[(Array[Byte])] + val rdd = sc.parallelize(Array( + Bytes.toBytes("1"), + Bytes.toBytes("2"), + Bytes.toBytes("3"), + Bytes.toBytes("4"), + Bytes.toBytes("5"), + Bytes.toBytes("6"), + Bytes.toBytes("7"))) + + val conf = HBaseConfiguration.create() + + val hbaseContext = new HBaseContext(sc, conf) + + val getRdd = hbaseContext.bulkGet[Array[Byte], String]( + TableName.valueOf(tableName), + 2, + rdd, + record => { + System.out.println("making Get") + new Get(record) + }, + (result: Result) => { + + val it = result.listCells().iterator() + val b = new StringBuilder + + b.append(Bytes.toString(result.getRow) + ":") + + while (it.hasNext) { + val cell = it.next() + val q = Bytes.toString(CellUtil.cloneQualifier(cell)) + if (q.equals("counter")) { + b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")") + } else { + b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")") + } + } + b.toString() + }) + + getRdd.collect().foreach(v => println(v)) + + } finally { + sc.stop() + } + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala new file mode 100644 index 00000000000..735efeda33d --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.hbasecontext + +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Put +import org.apache.spark.SparkConf + +/** + * This is a simple example of putting records in HBase + * with the bulkPut function. + */ +object HBaseBulkPutExample { + def main(args: Array[String]) { + if (args.length < 2) { + println("HBaseBulkPutExample {tableName} {columnFamily}") + return + } + + val tableName = args(0) + val columnFamily = args(1) + + val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + + tableName + " " + columnFamily) + val sc = new SparkContext(sparkConf) + + try { + //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])] + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), + (Bytes.toBytes("2"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), + (Bytes.toBytes("3"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), + (Bytes.toBytes("4"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), + (Bytes.toBytes("5"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))) + )) + + val conf = HBaseConfiguration.create() + + val hbaseContext = new HBaseContext(sc, conf) + hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, + TableName.valueOf(tableName), + (putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => + put.addColumn(putValue._1, putValue._2, putValue._3)) + put + }); + } finally { + sc.stop() + } + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala new file mode 100644 index 00000000000..3fd3006da2f --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.hbasecontext + +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.Text +import org.apache.spark.SparkConf + +/** + * This is a simple example of putting records in HBase + * with the bulkPut function. In this example we are + * getting the put information from a file + */ +object HBaseBulkPutExampleFromFile { + def main(args: Array[String]) { + if (args.length < 3) { + println("HBaseBulkPutExampleFromFile {tableName} {columnFamily} {inputFile}") + return + } + + val tableName = args(0) + val columnFamily = args(1) + val inputFile = args(2) + + val sparkConf = new SparkConf().setAppName("HBaseBulkPutExampleFromFile " + + tableName + " " + columnFamily + " " + inputFile) + val sc = new SparkContext(sparkConf) + + try { + var rdd = sc.hadoopFile( + inputFile, + classOf[TextInputFormat], + classOf[LongWritable], + classOf[Text]).map(v => { + System.out.println("reading-" + v._2.toString) + v._2.toString + }) + + val conf = HBaseConfiguration.create() + + val hbaseContext = new HBaseContext(sc, conf) + hbaseContext.bulkPut[String](rdd, + TableName.valueOf(tableName), + (putRecord) => { + System.out.println("hbase-" + putRecord) + val put = new Put(Bytes.toBytes("Value- " + putRecord)) + put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), + Bytes.toBytes(putRecord.length())) + put + }); + } finally { + sc.stop() + } + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala new file mode 100644 index 00000000000..ae92f373376 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.hbasecontext + +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Put +import org.apache.spark.SparkConf + +/** + * This is a simple example of putting records in HBase + * with the bulkPut function. In this example we are + * also setting the timestamp in the put + */ +object HBaseBulkPutTimestampExample { + def main(args: Array[String]) { + if (args.length < 2) { + System.out.println("HBaseBulkPutTimestampExample {tableName} {columnFamily}") + return + } + + val tableName = args(0) + val columnFamily = args(1) + + val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " + + tableName + " " + columnFamily) + val sc = new SparkContext(sparkConf) + + try { + + val rdd = sc.parallelize(Array( + (Bytes.toBytes("6"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), + (Bytes.toBytes("7"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), + (Bytes.toBytes("8"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), + (Bytes.toBytes("9"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), + (Bytes.toBytes("10"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))))) + + val conf = HBaseConfiguration.create() + + val timeStamp = System.currentTimeMillis() + + val hbaseContext = new HBaseContext(sc, conf) + hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, + TableName.valueOf(tableName), + (putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, + timeStamp, putValue._3)) + put + }) + } finally { + sc.stop() + } + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala new file mode 100644 index 00000000000..852b19811ab --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.hbasecontext + +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Scan +import org.apache.spark.SparkConf +/** + * This is a simple example of scanning records from HBase + * with the hbaseRDD function. + */ +object HBaseDistributedScanExample { + def main(args: Array[String]) { + if (args.length < 1) { + println("GenerateGraphs {tableName}") + return + } + + val tableName = args(0) + + val sparkConf = new SparkConf().setAppName("HBaseDistributedScanExample " + tableName ) + val sc = new SparkContext(sparkConf) + + try { + val conf = HBaseConfiguration.create() + + val hbaseContext = new HBaseContext(sc, conf) + + val scan = new Scan() + scan.setCaching(100) + + val getRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan) + + getRdd.foreach(v => println(Bytes.toString(v._1.get()))) + + println("Length: " + getRdd.map(r => r._1.copyBytes()).collect().length); + + //.collect().foreach(v => println(Bytes.toString(v._1.get()))) + } finally { + sc.stop() + } + } + +} \ No newline at end of file diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala new file mode 100644 index 00000000000..29afa49897a --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.hbasecontext + +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.spark.SparkContext +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Put +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.Seconds +import org.apache.spark.SparkConf + +/** + * This is a simple example of BulkPut with Spark Streaming + */ +object HBaseStreamingBulkPutExample { + def main(args: Array[String]) { + if (args.length < 4) { + println("HBaseStreamingBulkPutExample " + + "{host} {port} {tableName} {columnFamily}") + return + } + + val host = args(0) + val port = args(1) + val tableName = args(2) + val columnFamily = args(3) + + val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " + + tableName + " " + columnFamily) + val sc = new SparkContext(sparkConf) + try { + val ssc = new StreamingContext(sc, Seconds(1)) + + val lines = ssc.socketTextStream(host, port.toInt) + + val conf = HBaseConfiguration.create() + + val hbaseContext = new HBaseContext(sc, conf) + + hbaseContext.streamBulkPut[String](lines, + TableName.valueOf(tableName), + (putRecord) => { + if (putRecord.length() > 0) { + val put = new Put(Bytes.toBytes(putRecord)) + put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("foo"), Bytes.toBytes("bar")) + put + } else { + null + } + }) + ssc.start() + ssc.awaitTerminationOrTimeout(60000) + } finally { + sc.stop() + } + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala new file mode 100644 index 00000000000..b8f40a8517b --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.rdd + +import org.apache.hadoop.hbase.client.Delete +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ +import org.apache.hadoop.hbase.util.Bytes + +import org.apache.spark.{SparkContext, SparkConf} + +/** + * This is a simple example of deleting records in HBase + * with the bulkDelete function. + */ +object HBaseBulkDeleteExample { + def main(args: Array[String]) { + if (args.length < 1) { + println("HBaseBulkDeletesExample {tableName} ") + return + } + + val tableName = args(0) + + val sparkConf = new SparkConf().setAppName("HBaseBulkDeleteExample " + tableName) + val sc = new SparkContext(sparkConf) + try { + //[Array[Byte]] + val rdd = sc.parallelize(Array( + Bytes.toBytes("1"), + Bytes.toBytes("2"), + Bytes.toBytes("3"), + Bytes.toBytes("4"), + Bytes.toBytes("5") + )) + + val conf = HBaseConfiguration.create() + + val hbaseContext = new HBaseContext(sc, conf) + + rdd.hbaseBulkDelete(hbaseContext, TableName.valueOf(tableName), + putRecord => new Delete(putRecord), + 4) + + } finally { + sc.stop() + } + } +} \ No newline at end of file diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala new file mode 100644 index 00000000000..9d59e967c47 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark.example.rdd + +import org.apache.hadoop.hbase.client.{Result, Get} +import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseConfiguration} +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ +import org.apache.spark.{SparkContext, SparkConf} + +/** + * This is a simple example of getting records in HBase + * with the bulkGet function. + */ +object HBaseBulkGetExample { + def main(args: Array[String]) { + if (args.length < 1) { + println("HBaseBulkGetExample {tableName}") + return + } + + val tableName = args(0) + + val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName) + val sc = new SparkContext(sparkConf) + + try { + + //[(Array[Byte])] + val rdd = sc.parallelize(Array( + Bytes.toBytes("1"), + Bytes.toBytes("2"), + Bytes.toBytes("3"), + Bytes.toBytes("4"), + Bytes.toBytes("5"), + Bytes.toBytes("6"), + Bytes.toBytes("7"))) + + val conf = HBaseConfiguration.create() + + val hbaseContext = new HBaseContext(sc, conf) + + val getRdd = rdd.hbaseBulkGet[String](hbaseContext, TableName.valueOf(tableName), 2, + record => { + System.out.println("making Get") + new Get(record) + }, + (result: Result) => { + + val it = result.listCells().iterator() + val b = new StringBuilder + + b.append(Bytes.toString(result.getRow) + ":") + + while (it.hasNext) { + val cell = it.next() + val q = Bytes.toString(CellUtil.cloneQualifier(cell)) + if (q.equals("counter")) { + b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")") + } else { + b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")") + } + } + b.toString() + }) + + getRdd.collect().foreach(v => println(v)) + + } finally { + sc.stop() + } + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala new file mode 100644 index 00000000000..2d07e894f9f --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.rdd + +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} +import org.apache.spark.{SparkConf, SparkContext} + +/** + * This is a simple example of putting records in HBase + * with the bulkPut function. + */ +object HBaseBulkPutExample { + def main(args: Array[String]) { + if (args.length < 2) { + println("HBaseBulkPutExample {tableName} {columnFamily}") + return + } + + val tableName = args(0) + val columnFamily = args(1) + + val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + + tableName + " " + columnFamily) + val sc = new SparkContext(sparkConf) + + try { + //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])] + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), + (Bytes.toBytes("2"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), + (Bytes.toBytes("3"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), + (Bytes.toBytes("4"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), + (Bytes.toBytes("5"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))) + )) + + val conf = HBaseConfiguration.create() + + val hbaseContext = new HBaseContext(sc, conf) + + rdd.hbaseBulkPut(hbaseContext, TableName.valueOf(tableName), + (putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, + putValue._3)) + put + }) + + } finally { + sc.stop() + } + } + } \ No newline at end of file diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala new file mode 100644 index 00000000000..e2ad22433ee --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.rdd + +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ +import org.apache.hadoop.hbase.util.Bytes +import org.apache.spark.{SparkContext, SparkConf} + +/** + * This is a simple example of using the foreachPartition + * method with a HBase connection + */ +object HBaseForeachPartitionExample { + def main(args: Array[String]) { + if (args.length < 2) { + println("HBaseBulkPutExample {tableName} {columnFamily}") + return + } + + val tableName = args(0) + val columnFamily = args(1) + + val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + + tableName + " " + columnFamily) + val sc = new SparkContext(sparkConf) + + try { + //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])] + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), + (Bytes.toBytes("2"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), + (Bytes.toBytes("3"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), + (Bytes.toBytes("4"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), + (Bytes.toBytes("5"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))) + )) + + val conf = HBaseConfiguration.create() + + val hbaseContext = new HBaseContext(sc, conf) + + + rdd.hbaseForeachPartition(hbaseContext, + (it, connection) => { + val m = connection.getBufferedMutator(TableName.valueOf(tableName)) + + it.foreach(r => { + val put = new Put(r._1) + r._2.foreach((putValue) => + put.addColumn(putValue._1, putValue._2, putValue._3)) + m.mutate(put) + }) + m.flush() + m.close() + }) + + } finally { + sc.stop() + } + } +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala new file mode 100644 index 00000000000..bc444bee0f5 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.example.rdd + +import org.apache.hadoop.hbase.client.Get +import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} +import org.apache.hadoop.hbase.spark.HBaseContext +import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ +import org.apache.hadoop.hbase.util.Bytes +import org.apache.spark.{SparkContext, SparkConf} + +/** + * This is a simple example of using the mapPartitions + * method with a HBase connection + */ +object HBaseMapPartitionExample { + def main(args: Array[String]) { + if (args.length < 1) { + println("HBaseBulkGetExample {tableName}") + return + } + + val tableName = args(0) + + val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName) + val sc = new SparkContext(sparkConf) + + try { + + //[(Array[Byte])] + val rdd = sc.parallelize(Array( + Bytes.toBytes("1"), + Bytes.toBytes("2"), + Bytes.toBytes("3"), + Bytes.toBytes("4"), + Bytes.toBytes("5"), + Bytes.toBytes("6"), + Bytes.toBytes("7"))) + + val conf = HBaseConfiguration.create() + + val hbaseContext = new HBaseContext(sc, conf) + + val getRdd = rdd.hbaseMapPartitions[String](hbaseContext, (it, connection) => { + val table = connection.getTable(TableName.valueOf(tableName)) + it.map{r => + //batching would be faster. This is just an example + val result = table.get(new Get(r)) + + val it = result.listCells().iterator() + val b = new StringBuilder + + b.append(Bytes.toString(result.getRow) + ":") + + while (it.hasNext) { + val cell = it.next() + val q = Bytes.toString(cell.getQualifierArray) + if (q.equals("counter")) { + b.append("(" + q + "," + Bytes.toLong(cell.getValueArray) + ")") + } else { + b.append("(" + q + "," + Bytes.toString(cell.getValueArray) + ")") + } + } + b.toString() + } + }) + + getRdd.collect().foreach(v => println(v)) + + } finally { + sc.stop() + } + } +} diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java new file mode 100644 index 00000000000..f19ad10200e --- /dev/null +++ b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/JavaHBaseContextSuite.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; + +import java.util.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.Function; +import org.junit.*; + +import scala.Tuple2; + +import com.google.common.io.Files; + +public class JavaHBaseContextSuite implements Serializable { + private transient JavaSparkContext jsc; + HBaseTestingUtility htu; + protected static final Log LOG = LogFactory.getLog(JavaHBaseContextSuite.class); + + + byte[] tableName = Bytes.toBytes("t1"); + byte[] columnFamily = Bytes.toBytes("c"); + String columnFamilyStr = Bytes.toString(columnFamily); + + @Before + public void setUp() { + jsc = new JavaSparkContext("local", "JavaHBaseContextSuite"); + jsc.addJar("spark.jar"); + + File tempDir = Files.createTempDir(); + tempDir.deleteOnExit(); + + htu = HBaseTestingUtility.createLocalHTU(); + try { + LOG.info("cleaning up test dir"); + + htu.cleanupTestDir(); + + LOG.info("starting minicluster"); + + htu.startMiniZKCluster(); + htu.startMiniHBaseCluster(1, 1); + + LOG.info(" - minicluster started"); + + try { + htu.deleteTable(TableName.valueOf(tableName)); + } catch (Exception e) { + LOG.info(" - no table " + Bytes.toString(tableName) + " found"); + } + + LOG.info(" - creating table " + Bytes.toString(tableName)); + htu.createTable(TableName.valueOf(tableName), + columnFamily); + LOG.info(" - created table"); + } catch (Exception e1) { + throw new RuntimeException(e1); + } + } + + @After + public void tearDown() { + try { + htu.deleteTable(TableName.valueOf(tableName)); + LOG.info("shuting down minicluster"); + htu.shutdownMiniHBaseCluster(); + htu.shutdownMiniZKCluster(); + LOG.info(" - minicluster shut down"); + htu.cleanupTestDir(); + } catch (Exception e) { + throw new RuntimeException(e); + } + jsc.stop(); + jsc = null; + } + + @Test + public void testBulkPut() throws IOException { + + List list = new ArrayList<>(); + list.add("1," + columnFamilyStr + ",a,1"); + list.add("2," + columnFamilyStr + ",a,2"); + list.add("3," + columnFamilyStr + ",a,3"); + list.add("4," + columnFamilyStr + ",a,4"); + list.add("5," + columnFamilyStr + ",a,5"); + + JavaRDD rdd = jsc.parallelize(list); + + Configuration conf = htu.getConfiguration(); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(TableName.valueOf(tableName)); + + try { + List deletes = new ArrayList<>(); + for (int i = 1; i < 6; i++) { + deletes.add(new Delete(Bytes.toBytes(Integer.toString(i)))); + } + table.delete(deletes); + } finally { + table.close(); + } + + hbaseContext.bulkPut(rdd, + TableName.valueOf(tableName), + new PutFunction()); + + table = conn.getTable(TableName.valueOf(tableName)); + + try { + Result result1 = table.get(new Get(Bytes.toBytes("1"))); + Assert.assertNotNull("Row 1 should had been deleted", result1.getRow()); + + Result result2 = table.get(new Get(Bytes.toBytes("2"))); + Assert.assertNotNull("Row 2 should had been deleted", result2.getRow()); + + Result result3 = table.get(new Get(Bytes.toBytes("3"))); + Assert.assertNotNull("Row 3 should had been deleted", result3.getRow()); + + Result result4 = table.get(new Get(Bytes.toBytes("4"))); + Assert.assertNotNull("Row 4 should had been deleted", result4.getRow()); + + Result result5 = table.get(new Get(Bytes.toBytes("5"))); + Assert.assertNotNull("Row 5 should had been deleted", result5.getRow()); + } finally { + table.close(); + conn.close(); + } + } + + public static class PutFunction implements Function { + + private static final long serialVersionUID = 1L; + + public Put call(String v) throws Exception { + String[] cells = v.split(","); + Put put = new Put(Bytes.toBytes(cells[0])); + + put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]), + Bytes.toBytes(cells[3])); + return put; + } + } + + @Test + public void testBulkDelete() throws IOException { + List list = new ArrayList<>(); + list.add(Bytes.toBytes("1")); + list.add(Bytes.toBytes("2")); + list.add(Bytes.toBytes("3")); + + JavaRDD rdd = jsc.parallelize(list); + + Configuration conf = htu.getConfiguration(); + + populateTableWithMockData(conf, TableName.valueOf(tableName)); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + hbaseContext.bulkDelete(rdd, TableName.valueOf(tableName), + new JavaHBaseBulkDeleteExample.DeleteFunction(), 2); + + + + try ( + Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(TableName.valueOf(tableName)) + ){ + Result result1 = table.get(new Get(Bytes.toBytes("1"))); + Assert.assertNull("Row 1 should had been deleted", result1.getRow()); + + Result result2 = table.get(new Get(Bytes.toBytes("2"))); + Assert.assertNull("Row 2 should had been deleted", result2.getRow()); + + Result result3 = table.get(new Get(Bytes.toBytes("3"))); + Assert.assertNull("Row 3 should had been deleted", result3.getRow()); + + Result result4 = table.get(new Get(Bytes.toBytes("4"))); + Assert.assertNotNull("Row 4 should had been deleted", result4.getRow()); + + Result result5 = table.get(new Get(Bytes.toBytes("5"))); + Assert.assertNotNull("Row 5 should had been deleted", result5.getRow()); + } + } + + @Test + public void testDistributedScan() throws IOException { + Configuration conf = htu.getConfiguration(); + + populateTableWithMockData(conf, TableName.valueOf(tableName)); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + Scan scan = new Scan(); + scan.setCaching(100); + + JavaRDD javaRdd = + hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan) + .map(new ScanConvertFunction()); + + List results = javaRdd.collect(); + + Assert.assertEquals(results.size(), 5); + } + + private static class ScanConvertFunction implements + Function, String> { + @Override + public String call(Tuple2 v1) throws Exception { + return Bytes.toString(v1._1().copyBytes()); + } + } + + @Test + public void testBulkGet() throws IOException { + List list = new ArrayList<>(); + list.add(Bytes.toBytes("1")); + list.add(Bytes.toBytes("2")); + list.add(Bytes.toBytes("3")); + list.add(Bytes.toBytes("4")); + list.add(Bytes.toBytes("5")); + + JavaRDD rdd = jsc.parallelize(list); + + Configuration conf = htu.getConfiguration(); + + populateTableWithMockData(conf, TableName.valueOf(tableName)); + + JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); + + final JavaRDD stringJavaRDD = + hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd, + new GetFunction(), + new ResultFunction()); + + Assert.assertEquals(stringJavaRDD.count(), 5); + } + + public static class GetFunction implements Function { + + private static final long serialVersionUID = 1L; + + public Get call(byte[] v) throws Exception { + return new Get(v); + } + } + + public static class ResultFunction implements Function { + + private static final long serialVersionUID = 1L; + + public String call(Result result) throws Exception { + Iterator it = result.listCells().iterator(); + StringBuilder b = new StringBuilder(); + + b.append(Bytes.toString(result.getRow())).append(":"); + + while (it.hasNext()) { + Cell cell = it.next(); + String q = Bytes.toString(CellUtil.cloneQualifier(cell)); + if ("counter".equals(q)) { + b.append("(") + .append(q) + .append(",") + .append(Bytes.toLong(CellUtil.cloneValue(cell))) + .append(")"); + } else { + b.append("(") + .append(q) + .append(",") + .append(Bytes.toString(CellUtil.cloneValue(cell))) + .append(")"); + } + } + return b.toString(); + } + } + + private void populateTableWithMockData(Configuration conf, TableName tableName) + throws IOException { + try ( + Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(tableName)) { + + List puts = new ArrayList<>(); + + for (int i = 1; i < 6; i++) { + Put put = new Put(Bytes.toBytes(Integer.toString(i))); + put.addColumn(columnFamily, columnFamily, columnFamily); + puts.add(put); + } + table.put(puts); + } + } + +} \ No newline at end of file diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala new file mode 100644 index 00000000000..b27cfc72f64 --- /dev/null +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{ CellUtil, TableName, HBaseTestingUtility} +import org.apache.spark.{SparkException, Logging, SparkContext} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} + +class HBaseContextSuite extends FunSuite with +BeforeAndAfterEach with BeforeAndAfterAll with Logging { + + @transient var sc: SparkContext = null + var TEST_UTIL = new HBaseTestingUtility + + val tableName = "t1" + val columnFamily = "c" + + override def beforeAll() { + TEST_UTIL.startMiniCluster() + logInfo(" - minicluster started") + + try { + TEST_UTIL.deleteTable(TableName.valueOf(tableName)) + } catch { + case e: Exception => + logInfo(" - no table " + tableName + " found") + } + logInfo(" - creating table " + tableName) + TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily)) + logInfo(" - created table") + + val envMap = Map[String,String](("Xmx", "512m")) + + sc = new SparkContext("local", "test", null, Nil, envMap) + } + + override def afterAll() { + logInfo("shuting down minicluster") + TEST_UTIL.shutdownMiniCluster() + logInfo(" - minicluster shut down") + TEST_UTIL.cleanupTestDir() + sc.stop() + } + + test("bulkput to test HBase client") { + val config = TEST_UTIL.getConfiguration + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), + (Bytes.toBytes("2"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))), + (Bytes.toBytes("3"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3")))), + (Bytes.toBytes("4"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))), + (Bytes.toBytes("5"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar")))))) + + val hbaseContext = new HBaseContext(sc, config) + hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, + TableName.valueOf(tableName), + (putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) + put + }) + + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))) + assert(foo1 == "foo1") + + val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b")))) + assert(foo2 == "foo2") + + val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c")))) + assert(foo3 == "foo3") + + val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d")))) + assert(foo4 == "foo") + + val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e")))) + assert(foo5 == "bar") + + } finally { + table.close() + connection.close() + } + } + + test("bulkDelete to test HBase client") { + val config = TEST_UTIL.getConfiguration + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + var put = new Put(Bytes.toBytes("delete1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + table.put(put) + put = new Put(Bytes.toBytes("delete2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + table.put(put) + put = new Put(Bytes.toBytes("delete3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + + val rdd = sc.parallelize(Array( + Bytes.toBytes("delete1"), + Bytes.toBytes("delete3"))) + + val hbaseContext = new HBaseContext(sc, config) + hbaseContext.bulkDelete[Array[Byte]](rdd, + TableName.valueOf(tableName), + putRecord => new Delete(putRecord), + 4) + + assert(table.get(new Get(Bytes.toBytes("delete1"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null) + assert(table.get(new Get(Bytes.toBytes("delete3"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null) + assert(Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("delete2"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))).equals("foo2")) + } finally { + table.close() + connection.close() + } + } + + test("bulkGet to test HBase client") { + val config = TEST_UTIL.getConfiguration + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + var put = new Put(Bytes.toBytes("get1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + table.put(put) + put = new Put(Bytes.toBytes("get2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + table.put(put) + put = new Put(Bytes.toBytes("get3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + } finally { + table.close() + connection.close() + } + val rdd = sc.parallelize(Array( + Bytes.toBytes("get1"), + Bytes.toBytes("get2"), + Bytes.toBytes("get3"), + Bytes.toBytes("get4"))) + val hbaseContext = new HBaseContext(sc, config) + + val getRdd = hbaseContext.bulkGet[Array[Byte], String]( + TableName.valueOf(tableName), + 2, + rdd, + record => { + new Get(record) + }, + (result: Result) => { + if (result.listCells() != null) { + val it = result.listCells().iterator() + val B = new StringBuilder + + B.append(Bytes.toString(result.getRow) + ":") + + while (it.hasNext) { + val cell = it.next() + val q = Bytes.toString(CellUtil.cloneQualifier(cell)) + if (q.equals("counter")) { + B.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")") + } else { + B.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")") + } + } + "" + B.toString + } else { + "" + } + }) + val getArray = getRdd.collect() + + assert(getArray.length == 4) + assert(getArray.contains("get1:(a,foo1)")) + assert(getArray.contains("get2:(a,foo2)")) + assert(getArray.contains("get3:(a,foo3)")) + + } + + test("BulkGet failure test: bad table") { + val config = TEST_UTIL.getConfiguration + + val rdd = sc.parallelize(Array( + Bytes.toBytes("get1"), + Bytes.toBytes("get2"), + Bytes.toBytes("get3"), + Bytes.toBytes("get4"))) + val hbaseContext = new HBaseContext(sc, config) + + intercept[SparkException] { + try { + val getRdd = hbaseContext.bulkGet[Array[Byte], String]( + TableName.valueOf("badTableName"), + 2, + rdd, + record => { + new Get(record) + }, + (result: Result) => "1") + + getRdd.collect() + + fail("We should have failed and not reached this line") + } catch { + case ex: SparkException => { + assert( + ex.getMessage.contains( + "org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException")) + throw ex + } + } + } + } + + test("BulkGet failure test: bad column") { + + val config = TEST_UTIL.getConfiguration + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + var put = new Put(Bytes.toBytes("get1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + table.put(put) + put = new Put(Bytes.toBytes("get2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + table.put(put) + put = new Put(Bytes.toBytes("get3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + } finally { + table.close() + connection.close() + } + + val rdd = sc.parallelize(Array( + Bytes.toBytes("get1"), + Bytes.toBytes("get2"), + Bytes.toBytes("get3"), + Bytes.toBytes("get4"))) + val hbaseContext = new HBaseContext(sc, config) + + val getRdd = hbaseContext.bulkGet[Array[Byte], String]( + TableName.valueOf(tableName), + 2, + rdd, + record => { + new Get(record) + }, + (result: Result) => { + if (result.listCells() != null) { + val cellValue = result.getColumnLatestCell( + Bytes.toBytes("c"), Bytes.toBytes("bad_column")) + if (cellValue == null) "null" else "bad" + } else "noValue" + }) + var nullCounter = 0 + var noValueCounter = 0 + getRdd.collect().foreach(r => { + if ("null".equals(r)) nullCounter += 1 + else if ("noValue".equals(r)) noValueCounter += 1 + }) + assert(nullCounter == 3) + assert(noValueCounter == 1) + } + + test("distributedScan to test HBase client") { + val config = TEST_UTIL.getConfiguration + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + var put = new Put(Bytes.toBytes("scan1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + table.put(put) + put = new Put(Bytes.toBytes("scan2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + table.put(put) + put = new Put(Bytes.toBytes("scan3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + put = new Put(Bytes.toBytes("scan4")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + put = new Put(Bytes.toBytes("scan5")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + } finally { + table.close() + connection.close() + } + + val hbaseContext = new HBaseContext(sc, config) + + val scan = new Scan() + scan.setCaching(100) + scan.setStartRow(Bytes.toBytes("scan2")) + scan.setStopRow(Bytes.toBytes("scan4_")) + + val scanRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan) + + try { + val scanList = scanRdd.map(r => r._1.copyBytes()).collect() + assert(scanList.length == 3) + } catch { + case ex: Exception => ex.printStackTrace() + } + } +} diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctionsSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctionsSuite.scala new file mode 100644 index 00000000000..007aa84f943 --- /dev/null +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctionsSuite.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseTestingUtility} +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.{Milliseconds, StreamingContext} +import org.apache.spark.{SparkContext, Logging} +import org.apache.hadoop.hbase.spark.HBaseDStreamFunctions._ +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} + +import scala.collection.mutable + +class HBaseDStreamFunctionsSuite extends FunSuite with +BeforeAndAfterEach with BeforeAndAfterAll with Logging { + @transient var sc: SparkContext = null + + var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility + + val tableName = "t1" + val columnFamily = "c" + + override def beforeAll() { + TEST_UTIL.startMiniCluster() + + logInfo(" - minicluster started") + try + TEST_UTIL.deleteTable(TableName.valueOf(tableName)) + catch { + case e: Exception => logInfo(" - no table " + tableName + " found") + + } + logInfo(" - creating table " + tableName) + TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily)) + logInfo(" - created table") + + sc = new SparkContext("local", "test") + } + + override def afterAll() { + TEST_UTIL.deleteTable(TableName.valueOf(tableName)) + TEST_UTIL.shutdownMiniCluster() + sc.stop() + } + + test("bulkput to test HBase client") { + val config = TEST_UTIL.getConfiguration + val rdd1 = sc.parallelize(Array( + (Bytes.toBytes("1"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), + (Bytes.toBytes("2"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))), + (Bytes.toBytes("3"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3")))))) + + val rdd2 = sc.parallelize(Array( + (Bytes.toBytes("4"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))), + (Bytes.toBytes("5"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar")))))) + + val hbaseContext = new HBaseContext(sc, config) + val ssc = new StreamingContext(sc, Milliseconds(200)) + + val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte], + Array[Byte], Array[Byte])])]]() + queue += rdd1 + queue += rdd2 + val dStream = ssc.queueStream(queue) + + dStream.hbaseBulkPut( + hbaseContext, + TableName.valueOf(tableName), + (putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) + put + }) + + ssc.start() + + ssc.awaitTerminationOrTimeout(1000) + + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))) + assert(foo1 == "foo1") + + val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b")))) + assert(foo2 == "foo2") + + val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c")))) + assert(foo3 == "foo3") + + val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d")))) + assert(foo4 == "foo") + + val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e")))) + assert(foo5 == "bar") + } finally { + table.close() + connection.close() + } + } + +} \ No newline at end of file diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala new file mode 100644 index 00000000000..89148c39a5d --- /dev/null +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseTestingUtility} +import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ +import org.apache.spark.{Logging, SparkContext} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} + +import scala.collection.mutable + +class HBaseRDDFunctionsSuite extends FunSuite with +BeforeAndAfterEach with BeforeAndAfterAll with Logging { + @transient var sc: SparkContext = null + var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility + + val tableName = "t1" + val columnFamily = "c" + + override def beforeAll() { + + TEST_UTIL.startMiniCluster + + logInfo(" - minicluster started") + try + TEST_UTIL.deleteTable(TableName.valueOf(tableName)) + catch { + case e: Exception => logInfo(" - no table " + tableName + " found") + + } + logInfo(" - creating table " + tableName) + TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily)) + logInfo(" - created table") + + sc = new SparkContext("local", "test") + } + + override def afterAll() { + TEST_UTIL.deleteTable(TableName.valueOf(tableName)) + logInfo("shuting down minicluster") + TEST_UTIL.shutdownMiniCluster() + + sc.stop() + } + + test("bulkput to test HBase client") { + val config = TEST_UTIL.getConfiguration + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), + (Bytes.toBytes("2"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))), + (Bytes.toBytes("3"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3")))), + (Bytes.toBytes("4"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))), + (Bytes.toBytes("5"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar")))))) + + val hbaseContext = new HBaseContext(sc, config) + + rdd.hbaseBulkPut( + hbaseContext, + TableName.valueOf(tableName), + (putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) + put + }) + + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))) + assert(foo1 == "foo1") + + val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b")))) + assert(foo2 == "foo2") + + val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c")))) + assert(foo3 == "foo3") + + val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d")))) + assert(foo4 == "foo") + + val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e")))) + assert(foo5 == "bar") + } finally { + table.close() + connection.close() + } + } + + test("bulkDelete to test HBase client") { + val config = TEST_UTIL.getConfiguration + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + var put = new Put(Bytes.toBytes("delete1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + table.put(put) + put = new Put(Bytes.toBytes("delete2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + table.put(put) + put = new Put(Bytes.toBytes("delete3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + + val rdd = sc.parallelize(Array( + Bytes.toBytes("delete1"), + Bytes.toBytes("delete3"))) + + val hbaseContext = new HBaseContext(sc, config) + + rdd.hbaseBulkDelete(hbaseContext, + TableName.valueOf(tableName), + putRecord => new Delete(putRecord), + 4) + + assert(table.get(new Get(Bytes.toBytes("delete1"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null) + assert(table.get(new Get(Bytes.toBytes("delete3"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null) + assert(Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("delete2"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))).equals("foo2")) + } finally { + table.close() + connection.close() + } + + } + + test("bulkGet to test HBase client") { + val config = TEST_UTIL.getConfiguration + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + var put = new Put(Bytes.toBytes("get1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + table.put(put) + put = new Put(Bytes.toBytes("get2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + table.put(put) + put = new Put(Bytes.toBytes("get3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + } finally { + table.close() + connection.close() + } + + val rdd = sc.parallelize(Array( + Bytes.toBytes("get1"), + Bytes.toBytes("get2"), + Bytes.toBytes("get3"), + Bytes.toBytes("get4"))) + val hbaseContext = new HBaseContext(sc, config) + + //Get with custom convert logic + val getRdd = rdd.hbaseBulkGet[String](hbaseContext, TableName.valueOf(tableName), 2, + record => { + new Get(record) + }, + (result: Result) => { + if (result.listCells() != null) { + val it = result.listCells().iterator() + val B = new StringBuilder + + B.append(Bytes.toString(result.getRow) + ":") + + while (it.hasNext) { + val cell = it.next + val q = Bytes.toString(CellUtil.cloneQualifier(cell)) + if (q.equals("counter")) { + B.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")") + } else { + B.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")") + } + } + "" + B.toString + } else { + "" + } + }) + + val getArray = getRdd.collect() + + assert(getArray.length == 4) + assert(getArray.contains("get1:(a,foo1)")) + assert(getArray.contains("get2:(a,foo2)")) + assert(getArray.contains("get3:(a,foo3)")) + } + + test("bulkGet default converter to test HBase client") { + val config = TEST_UTIL.getConfiguration + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + var put = new Put(Bytes.toBytes("get1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + table.put(put) + put = new Put(Bytes.toBytes("get2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + table.put(put) + put = new Put(Bytes.toBytes("get3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + } finally { + table.close() + connection.close() + } + + val rdd = sc.parallelize(Array( + Bytes.toBytes("get1"), + Bytes.toBytes("get2"), + Bytes.toBytes("get3"), + Bytes.toBytes("get4"))) + val hbaseContext = new HBaseContext(sc, config) + + val getRdd = rdd.hbaseBulkGet(hbaseContext, TableName.valueOf("t1"), 2, + record => { + new Get(record) + }).map((row) => { + if (row != null && row._2.listCells() != null) { + val it = row._2.listCells().iterator() + val B = new StringBuilder + + B.append(Bytes.toString(row._2.getRow) + ":") + + while (it.hasNext) { + val cell = it.next + val q = Bytes.toString(CellUtil.cloneQualifier(cell)) + if (q.equals("counter")) { + B.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")") + } else { + B.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")") + } + } + "" + B.toString + } else { + "" + }}) + + val getArray = getRdd.collect() + + assert(getArray.length == 4) + assert(getArray.contains("get1:(a,foo1)")) + assert(getArray.contains("get2:(a,foo2)")) + assert(getArray.contains("get3:(a,foo3)")) + } + + test("foreachPartition with puts to test HBase client") { + val config = TEST_UTIL.getConfiguration + val rdd = sc.parallelize(Array( + (Bytes.toBytes("1foreach"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), + (Bytes.toBytes("2foreach"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))), + (Bytes.toBytes("3foreach"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3")))), + (Bytes.toBytes("4foreach"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))), + (Bytes.toBytes("5foreach"), + Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar")))))) + + val hbaseContext = new HBaseContext(sc, config) + + rdd.hbaseForeachPartition(hbaseContext, (it, conn) => { + val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1")) + it.foreach((putRecord) => { + val put = new Put(putRecord._1) + putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) + bufferedMutator.mutate(put) + }) + bufferedMutator.flush() + bufferedMutator.close() + }) + + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1foreach"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))) + assert(foo1 == "foo1") + + val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2foreach"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b")))) + assert(foo2 == "foo2") + + val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3foreach"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c")))) + assert(foo3 == "foo3") + + val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4foreach"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d")))) + assert(foo4 == "foo") + + val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))). + getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e")))) + assert(foo5 == "bar") + } finally { + table.close() + connection.close() + } + } + + test("mapPartitions with Get from test HBase client") { + val config = TEST_UTIL.getConfiguration + val connection = ConnectionFactory.createConnection(config) + val table = connection.getTable(TableName.valueOf("t1")) + + try { + var put = new Put(Bytes.toBytes("get1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + table.put(put) + put = new Put(Bytes.toBytes("get2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + table.put(put) + put = new Put(Bytes.toBytes("get3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + table.put(put) + } finally { + table.close() + connection.close() + } + + val rdd = sc.parallelize(Array( + Bytes.toBytes("get1"), + Bytes.toBytes("get2"), + Bytes.toBytes("get3"), + Bytes.toBytes("get4"))) + val hbaseContext = new HBaseContext(sc, config) + + //Get with custom convert logic + val getRdd = rdd.hbaseMapPartitions(hbaseContext, (it, conn) => { + val table = conn.getTable(TableName.valueOf("t1")) + var res = mutable.MutableList[String]() + + it.foreach(r => { + val get = new Get(r) + val result = table.get(get) + if (result.listCells != null) { + val it = result.listCells().iterator() + val B = new StringBuilder + + B.append(Bytes.toString(result.getRow) + ":") + + while (it.hasNext) { + val cell = it.next() + val q = Bytes.toString(CellUtil.cloneQualifier(cell)) + if (q.equals("counter")) { + B.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")") + } else { + B.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")") + } + } + res += "" + B.toString + } else { + res += "" + } + }) + res.iterator + }) + + val getArray = getRdd.collect() + + assert(getArray.length == 4) + assert(getArray.contains("get1:(a,foo1)")) + assert(getArray.contains("get2:(a,foo2)")) + assert(getArray.contains("get3:(a,foo3)")) + } +} diff --git a/pom.xml b/pom.xml index 6e9b229159b..bc883a33ef4 100644 --- a/pom.xml +++ b/pom.xml @@ -66,6 +66,7 @@ hbase-rest hbase-checkstyle hbase-shaded + hbase-spark