HBASE-13992 Integrate SparkOnHBase into HBase

Signed-off-by: Sean Busbey <busbey@cloudera.com>
This commit is contained in:
Ted Malaska 2015-07-28 11:10:37 -05:00 committed by Sean Busbey
parent 6b9b7cb8c7
commit 30f7d127c3
29 changed files with 4497 additions and 1 deletions

View File

@ -21,7 +21,8 @@ MAVEN_OPTS="${MAVEN_OPTS:-"-Xmx3100M"}"
OK_RELEASEAUDIT_WARNINGS=0
# Allow four warnings. Javadoc complains about sun.misc.Unsafe use.
# See HBASE-7457, HBASE-13761
OK_JAVADOC_WARNINGS=7
# Allow 2 additional warnings for Scala stub notice about MR. See HBASE-13992
OK_JAVADOC_WARNINGS=9
MAX_LINE_LENGTH=100

572
hbase-spark/pom.xml Normal file
View File

@ -0,0 +1,572 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>hbase</artifactId>
<groupId>org.apache.hbase</groupId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>hbase-spark</artifactId>
<name>Apache HBase - Spark</name>
<properties>
<spark.version>1.3.0</spark.version>
<scala.version>2.10.4</scala.version>
<scala.binary.version>2.10</scala.binary.version>
<top.dir>${project.basedir}/..</top.dir>
</properties>
<dependencies>
<!-- Force import of Spark's servlet API for unit tests -->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.0.1</version>
<scope>test</scope>
</dependency>
<!-- Mark Spark / Scala as provided -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<!-- make sure wrong scala version is not pulled in -->
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
<exclusion>
<!-- make sure wrong scala version is not pulled in -->
<groupId>org.scala-lang</groupId>
<artifactId>scalap</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>2.2.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalamock</groupId>
<artifactId>scalamock-scalatest-support_${scala.binary.version}</artifactId>
<version>3.1.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-two.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop-two.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop-two.version}</version>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop-two.version}</version>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>thrift</artifactId>
</exclusion>
<exclusion>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api-2.5</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>thrift</artifactId>
</exclusion>
<exclusion>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api-2.5</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>thrift</artifactId>
</exclusion>
<exclusion>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api-2.5</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-it</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<charset>${project.build.sourceEncoding}</charset>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
<parallel>false</parallel>
</configuration>
<executions>
<execution>
<id>test</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<skipTests>true</skipTests>
</configuration>
</execution>
<execution>
<id>integration-test</id>
<phase>integration-test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<tagsToExclude>Integration-Test</tagsToExclude>
<argLine>
-Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
</argLine>
<parallel>false</parallel>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.hbasecontext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.util.ArrayList;
import java.util.List;
/**
* This is a simple example of deleting records in HBase
* with the bulkDelete function.
*/
final public class JavaHBaseBulkDeleteExample {
private JavaHBaseBulkDeleteExample() {}
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("JavaHBaseBulkDeleteExample {tableName}");
return;
}
String tableName = args[0];
SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkDeleteExample " + tableName);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
try {
List<byte[]> list = new ArrayList<>();
list.add(Bytes.toBytes("1"));
list.add(Bytes.toBytes("2"));
list.add(Bytes.toBytes("3"));
list.add(Bytes.toBytes("4"));
list.add(Bytes.toBytes("5"));
JavaRDD<byte[]> rdd = jsc.parallelize(list);
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.bulkDelete(rdd,
TableName.valueOf(tableName), new DeleteFunction(), 4);
} finally {
jsc.stop();
}
}
public static class DeleteFunction implements Function<byte[], Delete> {
private static final long serialVersionUID = 1L;
public Delete call(byte[] v) throws Exception {
return new Delete(v);
}
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.hbasecontext;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
/**
* This is a simple example of getting records in HBase
* with the bulkGet function.
*/
final public class JavaHBaseBulkGetExample {
private JavaHBaseBulkGetExample() {}
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("JavaHBaseBulkGetExample {tableName}");
return;
}
String tableName = args[0];
SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample " + tableName);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
try {
List<byte[]> list = new ArrayList<>();
list.add(Bytes.toBytes("1"));
list.add(Bytes.toBytes("2"));
list.add(Bytes.toBytes("3"));
list.add(Bytes.toBytes("4"));
list.add(Bytes.toBytes("5"));
JavaRDD<byte[]> rdd = jsc.parallelize(list);
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd, new GetFunction(),
new ResultFunction());
} finally {
jsc.stop();
}
}
public static class GetFunction implements Function<byte[], Get> {
private static final long serialVersionUID = 1L;
public Get call(byte[] v) throws Exception {
return new Get(v);
}
}
public static class ResultFunction implements Function<Result, String> {
private static final long serialVersionUID = 1L;
public String call(Result result) throws Exception {
Iterator<Cell> it = result.listCells().iterator();
StringBuilder b = new StringBuilder();
b.append(Bytes.toString(result.getRow())).append(":");
while (it.hasNext()) {
Cell cell = it.next();
String q = Bytes.toString(cell.getQualifierArray());
if (q.equals("counter")) {
b.append("(")
.append(Bytes.toString(cell.getQualifierArray()))
.append(",")
.append(Bytes.toLong(cell.getValueArray()))
.append(")");
} else {
b.append("(")
.append(Bytes.toString(cell.getQualifierArray()))
.append(",")
.append(Bytes.toString(cell.getValueArray()))
.append(")");
}
}
return b.toString();
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.hbasecontext;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
/**
* This is a simple example of putting records in HBase
* with the bulkPut function.
*/
final public class JavaHBaseBulkPutExample {
private JavaHBaseBulkPutExample() {}
public static void main(String[] args) {
if (args.length < 2) {
System.out.println("JavaHBaseBulkPutExample " +
"{tableName} {columnFamily}");
return;
}
String tableName = args[0];
String columnFamily = args[1];
SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkPutExample " + tableName);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
try {
List<String> list = new ArrayList<>();
list.add("1," + columnFamily + ",a,1");
list.add("2," + columnFamily + ",a,2");
list.add("3," + columnFamily + ",a,3");
list.add("4," + columnFamily + ",a,4");
list.add("5," + columnFamily + ",a,5");
JavaRDD<String> rdd = jsc.parallelize(list);
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.bulkPut(rdd,
TableName.valueOf(tableName),
new PutFunction());
} finally {
jsc.stop();
}
}
public static class PutFunction implements Function<String, Put> {
private static final long serialVersionUID = 1L;
public Put call(String v) throws Exception {
String[] cells = v.split(",");
Put put = new Put(Bytes.toBytes(cells[0]));
put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
Bytes.toBytes(cells[3]));
return put;
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.hbasecontext;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
/**
* This is a simple example of scanning records from HBase
* with the hbaseRDD function.
*/
final public class JavaHBaseDistributedScan {
private JavaHBaseDistributedScan() {}
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("JavaHBaseDistributedScan {tableName}");
return;
}
String tableName = args[0];
SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseDistributedScan " + tableName);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
try {
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
Scan scan = new Scan();
scan.setCaching(100);
JavaRDD<Tuple2<ImmutableBytesWritable, Result>> javaRdd =
hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan);
List<String> results = javaRdd.map(new ScanConvertFunction()).collect();
System.out.println("Result Size: " + results.size());
} finally {
jsc.stop();
}
}
private static class ScanConvertFunction implements
Function<Tuple2<ImmutableBytesWritable, Result>, String> {
@Override
public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
return Bytes.toString(v1._1().copyBytes());
}
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.hbasecontext;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
* This is a simple example of using the foreachPartition
* method with a HBase connection
*/
final public class JavaHBaseMapGetPutExample {
private JavaHBaseMapGetPutExample() {}
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("JavaHBaseBulkGetExample {tableName}");
return;
}
final String tableName = args[0];
SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample " + tableName);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
try {
List<byte[]> list = new ArrayList<>();
list.add(Bytes.toBytes("1"));
list.add(Bytes.toBytes("2"));
list.add(Bytes.toBytes("3"));
list.add(Bytes.toBytes("4"));
list.add(Bytes.toBytes("5"));
JavaRDD<byte[]> rdd = jsc.parallelize(list);
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.foreachPartition(rdd,
new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
public void call(Tuple2<Iterator<byte[]>, Connection> t)
throws Exception {
Table table = t._2().getTable(TableName.valueOf(tableName));
BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
while (t._1().hasNext()) {
byte[] b = t._1().next();
Result r = table.get(new Get(b));
if (r.getExists()) {
mutator.mutate(new Put(b));
}
}
mutator.flush();
mutator.close();
table.close();
}
});
} finally {
jsc.stop();
}
}
public static class GetFunction implements Function<byte[], Get> {
private static final long serialVersionUID = 1L;
public Get call(byte[] v) throws Exception {
return new Get(v);
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.hbasecontext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
* This is a simple example of BulkPut with Spark Streaming
*/
final public class JavaHBaseStreamingBulkPutExample {
private JavaHBaseStreamingBulkPutExample() {}
public static void main(String[] args) {
if (args.length < 4) {
System.out.println("JavaHBaseBulkPutExample " +
"{host} {port} {tableName}");
return;
}
String host = args[0];
String port = args[1];
String tableName = args[2];
SparkConf sparkConf =
new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " +
tableName + ":" + port + ":" + tableName);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
try {
JavaStreamingContext jssc =
new JavaStreamingContext(jsc, new Duration(1000));
JavaReceiverInputDStream<String> javaDstream =
jssc.socketTextStream(host, Integer.parseInt(port));
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.streamBulkPut(javaDstream,
TableName.valueOf(tableName),
new PutFunction());
} finally {
jsc.stop();
}
}
public static class PutFunction implements Function<String, Put> {
private static final long serialVersionUID = 1L;
public Put call(String v) throws Exception {
String[] part = v.split(",");
Put put = new Put(Bytes.toBytes(part[0]));
put.addColumn(Bytes.toBytes(part[1]),
Bytes.toBytes(part[2]),
Bytes.toBytes(part[3]));
return put;
}
}
}

View File

@ -0,0 +1,570 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import org.apache.hadoop.hbase.TableName
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.client.Result
import scala.reflect.ClassTag
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Delete
import org.apache.spark.{Logging, SerializableWritable, SparkContext}
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.client.Mutation
import org.apache.spark.streaming.dstream.DStream
import java.io._
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
import org.apache.hadoop.fs.{Path, FileSystem}
/**
* HBaseContext is a façade for HBase operations
* like bulk put, get, increment, delete, and scan
*
* HBaseContext will take the responsibilities
* of disseminating the configuration information
* to the working and managing the life cycle of HConnections.
*/
class HBaseContext(@transient sc: SparkContext,
@transient config: Configuration,
val tmpHdfsConfgFile: String = null)
extends Serializable with Logging {
@transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
@transient var tmpHdfsConfiguration:Configuration = config
@transient var appliedCredentials = false
@transient val job = Job.getInstance(config)
TableMapReduceUtil.initCredentials(job)
val broadcastedConf = sc.broadcast(new SerializableWritable(config))
val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials))
if (tmpHdfsConfgFile != null && config != null) {
val fs = FileSystem.newInstance(config)
val tmpPath = new Path(tmpHdfsConfgFile)
if (!fs.exists(tmpPath)) {
val outputStream = fs.create(tmpPath)
config.write(outputStream)
outputStream.close()
} else {
logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!")
}
}
/**
* A simple enrichment of the traditional Spark RDD foreachPartition.
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
*
* Note: Do not close the HConnection object. All HConnection
* management is handled outside this method
*
* @param rdd Original RDD with data to iterate over
* @param f Function to be given a iterator to iterate through
* the RDD values and a HConnection object to interact
* with HBase
*/
def foreachPartition[T](rdd: RDD[T],
f: (Iterator[T], Connection) => Unit):Unit = {
rdd.foreachPartition(
it => hbaseForeachPartition(broadcastedConf, it, f))
}
/**
* A simple enrichment of the traditional Spark Streaming dStream foreach
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
*
* Note: Do not close the HConnection object. All HConnection
* management is handled outside this method
*
* @param dstream Original DStream with data to iterate over
* @param f Function to be given a iterator to iterate through
* the DStream values and a HConnection object to
* interact with HBase
*/
def foreachPartition[T](dstream: DStream[T],
f: (Iterator[T], Connection) => Unit):Unit = {
dstream.foreachRDD((rdd, time) => {
foreachPartition(rdd, f)
})
}
/**
* A simple enrichment of the traditional Spark RDD mapPartition.
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
*
* Note: Do not close the HConnection object. All HConnection
* management is handled outside this method
*
* @param rdd Original RDD with data to iterate over
* @param mp Function to be given a iterator to iterate through
* the RDD values and a HConnection object to interact
* with HBase
* @return Returns a new RDD generated by the user definition
* function just like normal mapPartition
*/
def mapPartitions[T, R: ClassTag](rdd: RDD[T],
mp: (Iterator[T], Connection) => Iterator[R]): RDD[R] = {
rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf,
it,
mp))
}
/**
* A simple enrichment of the traditional Spark Streaming DStream
* foreachPartition.
*
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
*
* Note: Do not close the HConnection object. All HConnection
* management is handled outside this method
*
* Note: Make sure to partition correctly to avoid memory issue when
* getting data from HBase
*
* @param dstream Original DStream with data to iterate over
* @param f Function to be given a iterator to iterate through
* the DStream values and a HConnection object to
* interact with HBase
* @return Returns a new DStream generated by the user
* definition function just like normal mapPartition
*/
def streamForeachPartition[T](dstream: DStream[T],
f: (Iterator[T], Connection) => Unit): Unit = {
dstream.foreachRDD(rdd => this.foreachPartition(rdd, f))
}
/**
* A simple enrichment of the traditional Spark Streaming DStream
* mapPartition.
*
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
*
* Note: Do not close the HConnection object. All HConnection
* management is handled outside this method
*
* Note: Make sure to partition correctly to avoid memory issue when
* getting data from HBase
*
* @param dstream Original DStream with data to iterate over
* @param f Function to be given a iterator to iterate through
* the DStream values and a HConnection object to
* interact with HBase
* @return Returns a new DStream generated by the user
* definition function just like normal mapPartition
*/
def streamMapPartitions[T, U: ClassTag](dstream: DStream[T],
f: (Iterator[T], Connection) => Iterator[U]):
DStream[U] = {
dstream.mapPartitions(it => hbaseMapPartition[T, U](
broadcastedConf,
it,
f))
}
/**
* A simple abstraction over the HBaseContext.foreachPartition method.
*
* It allow addition support for a user to take RDD
* and generate puts and send them to HBase.
* The complexity of managing the HConnection is
* removed from the developer
*
* @param rdd Original RDD with data to iterate over
* @param tableName The name of the table to put into
* @param f Function to convert a value in the RDD to a HBase Put
*/
def bulkPut[T](rdd: RDD[T], tableName: TableName, f: (T) => Put) {
val tName = tableName.getName
rdd.foreachPartition(
it => hbaseForeachPartition[T](
broadcastedConf,
it,
(iterator, connection) => {
val m = connection.getBufferedMutator(TableName.valueOf(tName))
iterator.foreach(T => m.mutate(f(T)))
m.flush()
m.close()
}))
}
def applyCreds[T] (configBroadcast: Broadcast[SerializableWritable[Configuration]]){
credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
logDebug("appliedCredentials:" + appliedCredentials + ",credentials:" + credentials)
if (!appliedCredentials && credentials != null) {
appliedCredentials = true
@transient val ugi = UserGroupInformation.getCurrentUser
ugi.addCredentials(credentials)
// specify that this is a proxy user
ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
ugi.addCredentials(credentialsConf.value.value)
}
}
/**
* A simple abstraction over the HBaseContext.streamMapPartition method.
*
* It allow addition support for a user to take a DStream and
* generate puts and send them to HBase.
*
* The complexity of managing the HConnection is
* removed from the developer
*
* @param dstream Original DStream with data to iterate over
* @param tableName The name of the table to put into
* @param f Function to convert a value in
* the DStream to a HBase Put
*/
def streamBulkPut[T](dstream: DStream[T],
tableName: TableName,
f: (T) => Put) = {
val tName = tableName.getName
dstream.foreachRDD((rdd, time) => {
bulkPut(rdd, TableName.valueOf(tName), f)
})
}
/**
* A simple abstraction over the HBaseContext.foreachPartition method.
*
* It allow addition support for a user to take a RDD and generate delete
* and send them to HBase. The complexity of managing the HConnection is
* removed from the developer
*
* @param rdd Original RDD with data to iterate over
* @param tableName The name of the table to delete from
* @param f Function to convert a value in the RDD to a
* HBase Deletes
* @param batchSize The number of delete to batch before sending to HBase
*/
def bulkDelete[T](rdd: RDD[T], tableName: TableName,
f: (T) => Delete, batchSize: Integer) {
bulkMutation(rdd, tableName, f, batchSize)
}
/**
* A simple abstraction over the HBaseContext.streamBulkMutation method.
*
* It allow addition support for a user to take a DStream and
* generate Delete and send them to HBase.
*
* The complexity of managing the HConnection is
* removed from the developer
*
* @param dstream Original DStream with data to iterate over
* @param tableName The name of the table to delete from
* @param f function to convert a value in the DStream to a
* HBase Delete
* @param batchSize The number of deletes to batch before sending to HBase
*/
def streamBulkDelete[T](dstream: DStream[T],
tableName: TableName,
f: (T) => Delete,
batchSize: Integer) = {
streamBulkMutation(dstream, tableName, f, batchSize)
}
/**
* Under lining function to support all bulk mutations
*
* May be opened up if requested
*/
private def bulkMutation[T](rdd: RDD[T], tableName: TableName,
f: (T) => Mutation, batchSize: Integer) {
val tName = tableName.getName
rdd.foreachPartition(
it => hbaseForeachPartition[T](
broadcastedConf,
it,
(iterator, connection) => {
val table = connection.getTable(TableName.valueOf(tName))
val mutationList = new java.util.ArrayList[Mutation]
iterator.foreach(T => {
mutationList.add(f(T))
if (mutationList.size >= batchSize) {
table.batch(mutationList, null)
mutationList.clear()
}
})
if (mutationList.size() > 0) {
table.batch(mutationList, null)
mutationList.clear()
}
table.close()
}))
}
/**
* Under lining function to support all bulk streaming mutations
*
* May be opened up if requested
*/
private def streamBulkMutation[T](dstream: DStream[T],
tableName: TableName,
f: (T) => Mutation,
batchSize: Integer) = {
val tName = tableName.getName
dstream.foreachRDD((rdd, time) => {
bulkMutation(rdd, TableName.valueOf(tName), f, batchSize)
})
}
/**
* A simple abstraction over the HBaseContext.mapPartition method.
*
* It allow addition support for a user to take a RDD and generates a
* new RDD based on Gets and the results they bring back from HBase
*
* @param rdd Original RDD with data to iterate over
* @param tableName The name of the table to get from
* @param makeGet function to convert a value in the RDD to a
* HBase Get
* @param convertResult This will convert the HBase Result object to
* what ever the user wants to put in the resulting
* RDD
* return new RDD that is created by the Get to HBase
*/
def bulkGet[T, U: ClassTag](tableName: TableName,
batchSize: Integer,
rdd: RDD[T],
makeGet: (T) => Get,
convertResult: (Result) => U): RDD[U] = {
val getMapPartition = new GetMapPartition(tableName,
batchSize,
makeGet,
convertResult)
rdd.mapPartitions[U](it =>
hbaseMapPartition[T, U](
broadcastedConf,
it,
getMapPartition.run))
}
/**
* A simple abstraction over the HBaseContext.streamMap method.
*
* It allow addition support for a user to take a DStream and
* generates a new DStream based on Gets and the results
* they bring back from HBase
*
* @param tableName The name of the table to get from
* @param batchSize The number of Gets to be sent in a single batch
* @param dStream Original DStream with data to iterate over
* @param makeGet Function to convert a value in the DStream to a
* HBase Get
* @param convertResult This will convert the HBase Result object to
* what ever the user wants to put in the resulting
* DStream
* @return A new DStream that is created by the Get to HBase
*/
def streamBulkGet[T, U: ClassTag](tableName: TableName,
batchSize: Integer,
dStream: DStream[T],
makeGet: (T) => Get,
convertResult: (Result) => U): DStream[U] = {
val getMapPartition = new GetMapPartition(tableName,
batchSize,
makeGet,
convertResult)
dStream.mapPartitions[U](it => hbaseMapPartition[T, U](
broadcastedConf,
it,
getMapPartition.run))
}
/**
* This function will use the native HBase TableInputFormat with the
* given scan object to generate a new RDD
*
* @param tableName the name of the table to scan
* @param scan the HBase scan object to use to read data from HBase
* @param f function to convert a Result object from HBase into
* what the user wants in the final generated RDD
* @return new RDD with results from scan
*/
def hbaseRDD[U: ClassTag](tableName: TableName, scan: Scan,
f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = {
val job: Job = Job.getInstance(getConf(broadcastedConf))
TableMapReduceUtil.initCredentials(job)
TableMapReduceUtil.initTableMapperJob(tableName, scan,
classOf[IdentityTableMapper], null, null, job)
sc.newAPIHadoopRDD(job.getConfiguration,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]).map(f)
}
/**
* A overloaded version of HBaseContext hbaseRDD that defines the
* type of the resulting RDD
*
* @param tableName the name of the table to scan
* @param scans the HBase scan object to use to read data from HBase
* @return New RDD with results from scan
*
*/
def hbaseRDD(tableName: TableName, scans: Scan):
RDD[(ImmutableBytesWritable, Result)] = {
hbaseRDD[(ImmutableBytesWritable, Result)](
tableName,
scans,
(r: (ImmutableBytesWritable, Result)) => r)
}
/**
* underlining wrapper all foreach functions in HBaseContext
*/
private def hbaseForeachPartition[T](configBroadcast:
Broadcast[SerializableWritable[Configuration]],
it: Iterator[T],
f: (Iterator[T], Connection) => Unit) = {
val config = getConf(configBroadcast)
applyCreds(configBroadcast)
// specify that this is a proxy user
val connection = ConnectionFactory.createConnection(config)
f(it, connection)
connection.close()
}
private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]):
Configuration = {
if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) {
val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf)
val inputStream = fs.open(new Path(tmpHdfsConfgFile))
tmpHdfsConfiguration = new Configuration(false)
tmpHdfsConfiguration.readFields(inputStream)
inputStream.close()
}
if (tmpHdfsConfiguration == null) {
try {
tmpHdfsConfiguration = configBroadcast.value.value
} catch {
case ex: Exception => logError("Unable to getConfig from broadcast", ex)
}
}
tmpHdfsConfiguration
}
/**
* underlining wrapper all mapPartition functions in HBaseContext
*
*/
private def hbaseMapPartition[K, U](
configBroadcast:
Broadcast[SerializableWritable[Configuration]],
it: Iterator[K],
mp: (Iterator[K], Connection) =>
Iterator[U]): Iterator[U] = {
val config = getConf(configBroadcast)
applyCreds(configBroadcast)
val connection = ConnectionFactory.createConnection(config)
val res = mp(it, connection)
connection.close()
res
}
/**
* underlining wrapper all get mapPartition functions in HBaseContext
*/
private class GetMapPartition[T, U](tableName: TableName,
batchSize: Integer,
makeGet: (T) => Get,
convertResult: (Result) => U)
extends Serializable {
val tName = tableName.getName
def run(iterator: Iterator[T], connection: Connection): Iterator[U] = {
val table = connection.getTable(TableName.valueOf(tName))
val gets = new java.util.ArrayList[Get]()
var res = List[U]()
while (iterator.hasNext) {
gets.add(makeGet(iterator.next()))
if (gets.size() == batchSize) {
val results = table.get(gets)
res = res ++ results.map(convertResult)
gets.clear()
}
}
if (gets.size() > 0) {
val results = table.get(gets)
res = res ++ results.map(convertResult)
gets.clear()
}
table.close()
res.iterator
}
}
/**
* Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
*
* This method is used to keep ClassTags out of the external Java API, as
* the Java compiler cannot produce them automatically. While this
* ClassTag-faking does please the compiler, it can cause problems at runtime
* if the Scala API relies on ClassTags for correctness.
*
* Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
* just worse performance or security issues.
* For instance, an Array of AnyRef can hold any type T, but may lose primitive
* specialization.
*/
private[spark]
def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
}

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.streaming.dstream.DStream
import scala.reflect.ClassTag
/**
* HBaseDStreamFunctions contains a set of implicit functions that can be
* applied to a Spark DStream so that we can easily interact with HBase
*/
object HBaseDStreamFunctions {
/**
* These are implicit methods for a DStream that contains any type of
* data.
*
* @param dStream This is for dStreams of any type
* @tparam T Type T
*/
implicit class GenericHBaseDStreamFunctions[T](val dStream: DStream[T]) {
/**
* Implicit method that gives easy access to HBaseContext's bulk
* put. This will not return a new Stream. Think of it like a foreach
*
* @param hc The hbaseContext object to identify which
* HBase cluster connection to use
* @param tableName The tableName that the put will be sent to
* @param f The function that will turn the DStream values
* into HBase Put objects.
*/
def hbaseBulkPut(hc: HBaseContext,
tableName: TableName,
f: (T) => Put): Unit = {
hc.streamBulkPut(dStream, tableName, f)
}
/**
* Implicit method that gives easy access to HBaseContext's bulk
* get. This will return a new DStream. Think about it as a DStream map
* function. In that every DStream value will get a new value out of
* HBase. That new value will populate the newly generated DStream.
*
* @param hc The hbaseContext object to identify which
* HBase cluster connection to use
* @param tableName The tableName that the put will be sent to
* @param batchSize How many gets to execute in a single batch
* @param f The function that will turn the RDD values
* in HBase Get objects
* @param convertResult The function that will convert a HBase
* Result object into a value that will go
* into the resulting DStream
* @tparam R The type of Object that will be coming
* out of the resulting DStream
* @return A resulting DStream with type R objects
*/
def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
tableName: TableName,
batchSize:Int, f: (T) => Get, convertResult: (Result) => R):
DStream[R] = {
hc.streamBulkGet[T, R](tableName, batchSize, dStream, f, convertResult)
}
/**
* Implicit method that gives easy access to HBaseContext's bulk
* get. This will return a new DStream. Think about it as a DStream map
* function. In that every DStream value will get a new value out of
* HBase. That new value will populate the newly generated DStream.
*
* @param hc The hbaseContext object to identify which
* HBase cluster connection to use
* @param tableName The tableName that the put will be sent to
* @param batchSize How many gets to execute in a single batch
* @param f The function that will turn the RDD values
* in HBase Get objects
* @return A resulting DStream with type R objects
*/
def hbaseBulkGet(hc: HBaseContext,
tableName: TableName, batchSize:Int,
f: (T) => Get): DStream[(ImmutableBytesWritable, Result)] = {
hc.streamBulkGet[T, (ImmutableBytesWritable, Result)](
tableName, batchSize, dStream, f,
result => (new ImmutableBytesWritable(result.getRow), result))
}
/**
* Implicit method that gives easy access to HBaseContext's bulk
* Delete. This will not return a new DStream.
*
* @param hc The hbaseContext object to identify which HBase
* cluster connection to use
* @param tableName The tableName that the deletes will be sent to
* @param f The function that will convert the DStream value into
* a HBase Delete Object
* @param batchSize The number of Deletes to be sent in a single batch
*/
def hbaseBulkDelete(hc: HBaseContext,
tableName: TableName,
f:(T) => Delete, batchSize:Int): Unit = {
hc.streamBulkDelete(dStream, tableName, f, batchSize)
}
/**
* Implicit method that gives easy access to HBaseContext's
* foreachPartition method. This will ack very much like a normal DStream
* foreach method but for the fact that you will now have a HBase connection
* while iterating through the values.
*
* @param hc The hbaseContext object to identify which HBase
* cluster connection to use
* @param f This function will get an iterator for a Partition of an
* DStream along with a connection object to HBase
*/
def hbaseForeachPartition(hc: HBaseContext,
f: (Iterator[T], Connection) => Unit): Unit = {
hc.streamForeachPartition(dStream, f)
}
/**
* Implicit method that gives easy access to HBaseContext's
* mapPartitions method. This will ask very much like a normal DStream
* map partitions method but for the fact that you will now have a
* HBase connection while iterating through the values
*
* @param hc The hbaseContext object to identify which HBase
* cluster connection to use
* @param f This function will get an iterator for a Partition of an
* DStream along with a connection object to HBase
* @tparam R This is the type of objects that will go into the resulting
* DStream
* @return A resulting DStream of type R
*/
def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
f: (Iterator[T], Connection) => Iterator[R]):
DStream[R] = {
hc.streamMapPartitions(dStream, f)
}
}
}

View File

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
/**
* HBaseRDDFunctions contains a set of implicit functions that can be
* applied to a Spark RDD so that we can easily interact with HBase
*/
object HBaseRDDFunctions
{
/**
* These are implicit methods for a RDD that contains any type of
* data.
*
* @param rdd This is for rdd of any type
* @tparam T This is any type
*/
implicit class GenericHBaseRDDFunctions[T](val rdd: RDD[T]) {
/**
* Implicit method that gives easy access to HBaseContext's bulk
* put. This will not return a new RDD. Think of it like a foreach
*
* @param hc The hbaseContext object to identify which
* HBase cluster connection to use
* @param tableName The tableName that the put will be sent to
* @param f The function that will turn the RDD values
* into HBase Put objects.
*/
def hbaseBulkPut(hc: HBaseContext,
tableName: TableName,
f: (T) => Put): Unit = {
hc.bulkPut(rdd, tableName, f)
}
/**
* Implicit method that gives easy access to HBaseContext's bulk
* get. This will return a new RDD. Think about it as a RDD map
* function. In that every RDD value will get a new value out of
* HBase. That new value will populate the newly generated RDD.
*
* @param hc The hbaseContext object to identify which
* HBase cluster connection to use
* @param tableName The tableName that the put will be sent to
* @param batchSize How many gets to execute in a single batch
* @param f The function that will turn the RDD values
* in HBase Get objects
* @param convertResult The function that will convert a HBase
* Result object into a value that will go
* into the resulting RDD
* @tparam R The type of Object that will be coming
* out of the resulting RDD
* @return A resulting RDD with type R objects
*/
def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
tableName: TableName, batchSize:Int,
f: (T) => Get, convertResult: (Result) => R): RDD[R] = {
hc.bulkGet[T, R](tableName, batchSize, rdd, f, convertResult)
}
/**
* Implicit method that gives easy access to HBaseContext's bulk
* get. This will return a new RDD. Think about it as a RDD map
* function. In that every RDD value will get a new value out of
* HBase. That new value will populate the newly generated RDD.
*
* @param hc The hbaseContext object to identify which
* HBase cluster connection to use
* @param tableName The tableName that the put will be sent to
* @param batchSize How many gets to execute in a single batch
* @param f The function that will turn the RDD values
* in HBase Get objects
* @return A resulting RDD with type R objects
*/
def hbaseBulkGet(hc: HBaseContext,
tableName: TableName, batchSize:Int,
f: (T) => Get): RDD[(ImmutableBytesWritable, Result)] = {
hc.bulkGet[T, (ImmutableBytesWritable, Result)](tableName,
batchSize, rdd, f,
result => if (result != null && result.getRow != null) {
(new ImmutableBytesWritable(result.getRow), result)
} else {
null
})
}
/**
* Implicit method that gives easy access to HBaseContext's bulk
* Delete. This will not return a new RDD.
*
* @param hc The hbaseContext object to identify which HBase
* cluster connection to use
* @param tableName The tableName that the deletes will be sent to
* @param f The function that will convert the RDD value into
* a HBase Delete Object
* @param batchSize The number of Deletes to be sent in a single batch
*/
def hbaseBulkDelete(hc: HBaseContext,
tableName: TableName, f:(T) => Delete, batchSize:Int): Unit = {
hc.bulkDelete(rdd, tableName, f, batchSize)
}
/**
* Implicit method that gives easy access to HBaseContext's
* foreachPartition method. This will ack very much like a normal RDD
* foreach method but for the fact that you will now have a HBase connection
* while iterating through the values.
*
* @param hc The hbaseContext object to identify which HBase
* cluster connection to use
* @param f This function will get an iterator for a Partition of an
* RDD along with a connection object to HBase
*/
def hbaseForeachPartition(hc: HBaseContext,
f: (Iterator[T], Connection) => Unit): Unit = {
hc.foreachPartition(rdd, f)
}
/**
* Implicit method that gives easy access to HBaseContext's
* mapPartitions method. This will ask very much like a normal RDD
* map partitions method but for the fact that you will now have a
* HBase connection while iterating through the values
*
* @param hc The hbaseContext object to identify which HBase
* cluster connection to use
* @param f This function will get an iterator for a Partition of an
* RDD along with a connection object to HBase
* @tparam R This is the type of objects that will go into the resulting
* RDD
* @return A resulting RDD of type R
*/
def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
f: (Iterator[T], Connection) => Iterator[R]):
RDD[R] = {
hc.mapPartitions[T,R](rdd, f)
}
}
}

View File

@ -0,0 +1,347 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import org.apache.hadoop.hbase.TableName
import org.apache.spark.api.java.JavaSparkContext
import org.apache.hadoop.conf.Configuration
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function.VoidFunction
import org.apache.spark.api.java.function.Function
import org.apache.hadoop.hbase.client.Connection
import org.apache.spark.streaming.api.java.JavaDStream
import org.apache.spark.api.java.function.FlatMapFunction
import scala.collection.JavaConversions._
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Delete
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import scala.reflect.ClassTag
/**
* This is the Java Wrapper over HBaseContext which is written in
* Scala. This class will be used by developers that want to
* work with Spark or Spark Streaming in Java
*
* @param jsc This is the JavaSparkContext that we will wrap
* @param config This is the config information to out HBase cluster
*/
class JavaHBaseContext(@transient jsc: JavaSparkContext,
@transient config: Configuration) extends Serializable {
val hbaseContext = new HBaseContext(jsc.sc, config)
/**
* A simple enrichment of the traditional Spark javaRdd foreachPartition.
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
*
* Note: Do not close the HConnection object. All HConnection
* management is handled outside this method
*
* @param javaRdd Original javaRdd with data to iterate over
* @param f Function to be given a iterator to iterate through
* the RDD values and a HConnection object to interact
* with HBase
*/
def foreachPartition[T](javaRdd: JavaRDD[T],
f: VoidFunction[(java.util.Iterator[T], Connection)] ) = {
hbaseContext.foreachPartition(javaRdd.rdd,
(it:Iterator[T], conn:Connection) =>
{ f.call((it, conn)) })
}
/**
* A simple enrichment of the traditional Spark Streaming dStream foreach
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
*
* Note: Do not close the HConnection object. All HConnection
* management is handled outside this method
*
* @param javaDstream Original DStream with data to iterate over
* @param f Function to be given a iterator to iterate through
* the JavaDStream values and a HConnection object to
* interact with HBase
*/
def foreachPartition[T](javaDstream: JavaDStream[T],
f: VoidFunction[(Iterator[T], Connection)]) = {
hbaseContext.foreachPartition(javaDstream.dstream,
(it:Iterator[T], conn: Connection) => f.call(it, conn))
}
/**
* A simple enrichment of the traditional Spark JavaRDD mapPartition.
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
*
* Note: Do not close the HConnection object. All HConnection
* management is handled outside this method
*
* Note: Make sure to partition correctly to avoid memory issue when
* getting data from HBase
*
* @param javaRdd Original JavaRdd with data to iterate over
* @param f Function to be given a iterator to iterate through
* the RDD values and a HConnection object to interact
* with HBase
* @return Returns a new RDD generated by the user definition
* function just like normal mapPartition
*/
def mapPartitions[T,R](javaRdd: JavaRDD[T],
f: FlatMapFunction[(java.util.Iterator[T],
Connection),R] ): JavaRDD[R] = {
def fn = (it: Iterator[T], conn: Connection) =>
asScalaIterator(
f.call((asJavaIterator(it), conn)).iterator()
)
JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd,
(iterator:Iterator[T], connection:Connection) =>
fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R])
}
/**
* A simple enrichment of the traditional Spark Streaming JavaDStream
* mapPartition.
*
* This function differs from the original in that it offers the
* developer access to a already connected HConnection object
*
* Note: Do not close the HConnection object. All HConnection
* management is handled outside this method
*
* Note: Make sure to partition correctly to avoid memory issue when
* getting data from HBase
*
* @param javaDstream Original JavaDStream with data to iterate over
* @param mp Function to be given a iterator to iterate through
* the JavaDStream values and a HConnection object to
* interact with HBase
* @return Returns a new JavaDStream generated by the user
* definition function just like normal mapPartition
*/
def streamMap[T, U](javaDstream: JavaDStream[T],
mp: Function[(Iterator[T], Connection), Iterator[U]]):
JavaDStream[U] = {
JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream,
(it: Iterator[T], conn: Connection) =>
mp.call(it, conn) )(fakeClassTag[U]))(fakeClassTag[U])
}
/**
* A simple abstraction over the HBaseContext.foreachPartition method.
*
* It allow addition support for a user to take JavaRDD
* and generate puts and send them to HBase.
* The complexity of managing the HConnection is
* removed from the developer
*
* @param javaDdd Original JavaRDD with data to iterate over
* @param tableName The name of the table to put into
* @param f Function to convert a value in the JavaRDD
* to a HBase Put
*/
def bulkPut[T](javaDdd: JavaRDD[T],
tableName: TableName,
f: Function[(T), Put]) {
hbaseContext.bulkPut(javaDdd.rdd, tableName, (t:T) => f.call(t))
}
/**
* A simple abstraction over the HBaseContext.streamMapPartition method.
*
* It allow addition support for a user to take a JavaDStream and
* generate puts and send them to HBase.
*
* The complexity of managing the HConnection is
* removed from the developer
*
* @param javaDstream Original DStream with data to iterate over
* @param tableName The name of the table to put into
* @param f Function to convert a value in
* the JavaDStream to a HBase Put
*/
def streamBulkPut[T](javaDstream: JavaDStream[T],
tableName: TableName,
f: Function[T,Put]) = {
hbaseContext.streamBulkPut(javaDstream.dstream,
tableName,
(t:T) => f.call(t))
}
/**
* A simple abstraction over the HBaseContext.foreachPartition method.
*
* It allow addition support for a user to take a JavaRDD and
* generate delete and send them to HBase.
*
* The complexity of managing the HConnection is
* removed from the developer
*
* @param javaRdd Original JavaRDD with data to iterate over
* @param tableName The name of the table to delete from
* @param f Function to convert a value in the JavaRDD to a
* HBase Deletes
* @param batchSize The number of deletes to batch before sending to HBase
*/
def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName,
f: Function[T, Delete], batchSize:Integer) {
hbaseContext.bulkDelete(javaRdd.rdd, tableName, (t:T) => f.call(t), batchSize)
}
/**
* A simple abstraction over the HBaseContext.streamBulkMutation method.
*
* It allow addition support for a user to take a JavaDStream and
* generate Delete and send them to HBase.
*
* The complexity of managing the HConnection is
* removed from the developer
*
* @param javaDstream Original DStream with data to iterate over
* @param tableName The name of the table to delete from
* @param f Function to convert a value in the JavaDStream to a
* HBase Delete
* @param batchSize The number of deletes to be sent at once
*/
def streamBulkDelete[T](javaDstream: JavaDStream[T],
tableName: TableName,
f: Function[T, Delete],
batchSize: Integer) = {
hbaseContext.streamBulkDelete(javaDstream.dstream, tableName,
(t:T) => f.call(t),
batchSize)
}
/**
* A simple abstraction over the HBaseContext.mapPartition method.
*
* It allow addition support for a user to take a JavaRDD and generates a
* new RDD based on Gets and the results they bring back from HBase
*
* @param tableName The name of the table to get from
* @param batchSize batch size of how many gets to retrieve in a single fetch
* @param javaRdd Original JavaRDD with data to iterate over
* @param makeGet Function to convert a value in the JavaRDD to a
* HBase Get
* @param convertResult This will convert the HBase Result object to
* what ever the user wants to put in the resulting
* JavaRDD
* return new JavaRDD that is created by the Get to HBase
*/
def bulkGet[T, U](tableName: TableName,
batchSize:Integer,
javaRdd: JavaRDD[T],
makeGet: Function[T, Get],
convertResult: Function[Result, U]): JavaRDD[U] = {
JavaRDD.fromRDD(hbaseContext.bulkGet[T, U](tableName,
batchSize,
javaRdd.rdd,
(t:T) => makeGet.call(t),
(r:Result) => {convertResult.call(r)})(fakeClassTag[U]))(fakeClassTag[U])
}
/**
* A simple abstraction over the HBaseContext.streamMap method.
*
* It allow addition support for a user to take a DStream and
* generates a new DStream based on Gets and the results
* they bring back from HBase
*
* @param tableName The name of the table to get from
* @param batchSize The number of gets to be batched together
* @param javaDStream Original DStream with data to iterate over
* @param makeGet Function to convert a value in the JavaDStream to a
* HBase Get
* @param convertResult This will convert the HBase Result object to
* what ever the user wants to put in the resulting
* JavaDStream
* return new JavaDStream that is created by the Get to HBase
*/
def streamBulkGet[T, U](tableName:TableName,
batchSize:Integer,
javaDStream: JavaDStream[T],
makeGet: Function[T, Get],
convertResult: Function[Result, U]) {
JavaDStream.fromDStream(hbaseContext.streamBulkGet(tableName,
batchSize,
javaDStream.dstream,
(t:T) => makeGet.call(t),
(r:Result) => convertResult.call(r) )(fakeClassTag[U]))(fakeClassTag[U])
}
/**
* This function will use the native HBase TableInputFormat with the
* given scan object to generate a new JavaRDD
*
* @param tableName the name of the table to scan
* @param scans the HBase scan object to use to read data from HBase
* @param f function to convert a Result object from HBase into
* what the user wants in the final generated JavaRDD
* @return new JavaRDD with results from scan
*/
def hbaseRDD[U](tableName: TableName,
scans: Scan,
f: Function[(ImmutableBytesWritable, Result), U]):
JavaRDD[U] = {
JavaRDD.fromRDD(
hbaseContext.hbaseRDD[U](tableName,
scans,
(v:(ImmutableBytesWritable, Result)) =>
f.call(v._1, v._2))(fakeClassTag[U]))(fakeClassTag[U])
}
/**
* A overloaded version of HBaseContext hbaseRDD that define the
* type of the resulting JavaRDD
*
* @param tableName the name of the table to scan
* @param scans the HBase scan object to use to read data from HBase
* @return New JavaRDD with results from scan
*
*/
def hbaseRDD(tableName: TableName,
scans: Scan):
JavaRDD[(ImmutableBytesWritable, Result)] = {
JavaRDD.fromRDD(hbaseContext.hbaseRDD(tableName, scans))
}
/**
* Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
*
* This method is used to keep ClassTags out of the external Java API, as the Java compiler
* cannot produce them automatically. While this ClassTag-faking does please the compiler,
* it can cause problems at runtime if the Scala API relies on ClassTags for correctness.
*
* Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
* just worse performance or security issues.
* For instance, an Array[AnyRef] can hold any type T,
* but may lose primitive
* specialization.
*/
private[spark]
def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.hbasecontext
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Delete
import org.apache.spark.SparkConf
/**
* This is a simple example of deleting records in HBase
* with the bulkDelete function.
*/
object HBaseBulkDeleteExample {
def main(args: Array[String]) {
if (args.length < 1) {
println("HBaseBulkDeletesExample {tableName} ")
return
}
val tableName = args(0)
val sparkConf = new SparkConf().setAppName("HBaseBulkDeleteExample " + tableName)
val sc = new SparkContext(sparkConf)
try {
//[Array[Byte]]
val rdd = sc.parallelize(Array(
Bytes.toBytes("1"),
Bytes.toBytes("2"),
Bytes.toBytes("3"),
Bytes.toBytes("4"),
Bytes.toBytes("5")
))
val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
hbaseContext.bulkDelete[Array[Byte]](rdd,
TableName.valueOf(tableName),
putRecord => new Delete(putRecord),
4)
} finally {
sc.stop()
}
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.hbasecontext
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.client.Result
import org.apache.spark.SparkConf
/**
* This is a simple example of getting records in HBase
* with the bulkGet function.
*/
object HBaseBulkGetExample {
def main(args: Array[String]) {
if (args.length < 1) {
println("HBaseBulkGetExample {tableName}")
return
}
val tableName = args(0)
val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName)
val sc = new SparkContext(sparkConf)
try {
//[(Array[Byte])]
val rdd = sc.parallelize(Array(
Bytes.toBytes("1"),
Bytes.toBytes("2"),
Bytes.toBytes("3"),
Bytes.toBytes("4"),
Bytes.toBytes("5"),
Bytes.toBytes("6"),
Bytes.toBytes("7")))
val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
val getRdd = hbaseContext.bulkGet[Array[Byte], String](
TableName.valueOf(tableName),
2,
rdd,
record => {
System.out.println("making Get")
new Get(record)
},
(result: Result) => {
val it = result.listCells().iterator()
val b = new StringBuilder
b.append(Bytes.toString(result.getRow) + ":")
while (it.hasNext) {
val cell = it.next()
val q = Bytes.toString(CellUtil.cloneQualifier(cell))
if (q.equals("counter")) {
b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
} else {
b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
}
}
b.toString()
})
getRdd.collect().foreach(v => println(v))
} finally {
sc.stop()
}
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.hbasecontext
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.spark.SparkConf
/**
* This is a simple example of putting records in HBase
* with the bulkPut function.
*/
object HBaseBulkPutExample {
def main(args: Array[String]) {
if (args.length < 2) {
println("HBaseBulkPutExample {tableName} {columnFamily}")
return
}
val tableName = args(0)
val columnFamily = args(1)
val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " +
tableName + " " + columnFamily)
val sc = new SparkContext(sparkConf)
try {
//[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])]
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
(Bytes.toBytes("2"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
(Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
(Bytes.toBytes("4"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
(Bytes.toBytes("5"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
))
val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
TableName.valueOf(tableName),
(putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) =>
put.addColumn(putValue._1, putValue._2, putValue._3))
put
});
} finally {
sc.stop()
}
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.hbasecontext
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.spark.SparkConf
/**
* This is a simple example of putting records in HBase
* with the bulkPut function. In this example we are
* getting the put information from a file
*/
object HBaseBulkPutExampleFromFile {
def main(args: Array[String]) {
if (args.length < 3) {
println("HBaseBulkPutExampleFromFile {tableName} {columnFamily} {inputFile}")
return
}
val tableName = args(0)
val columnFamily = args(1)
val inputFile = args(2)
val sparkConf = new SparkConf().setAppName("HBaseBulkPutExampleFromFile " +
tableName + " " + columnFamily + " " + inputFile)
val sc = new SparkContext(sparkConf)
try {
var rdd = sc.hadoopFile(
inputFile,
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text]).map(v => {
System.out.println("reading-" + v._2.toString)
v._2.toString
})
val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
hbaseContext.bulkPut[String](rdd,
TableName.valueOf(tableName),
(putRecord) => {
System.out.println("hbase-" + putRecord)
val put = new Put(Bytes.toBytes("Value- " + putRecord))
put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"),
Bytes.toBytes(putRecord.length()))
put
});
} finally {
sc.stop()
}
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.hbasecontext
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.spark.SparkConf
/**
* This is a simple example of putting records in HBase
* with the bulkPut function. In this example we are
* also setting the timestamp in the put
*/
object HBaseBulkPutTimestampExample {
def main(args: Array[String]) {
if (args.length < 2) {
System.out.println("HBaseBulkPutTimestampExample {tableName} {columnFamily}")
return
}
val tableName = args(0)
val columnFamily = args(1)
val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " +
tableName + " " + columnFamily)
val sc = new SparkContext(sparkConf)
try {
val rdd = sc.parallelize(Array(
(Bytes.toBytes("6"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
(Bytes.toBytes("7"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
(Bytes.toBytes("8"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
(Bytes.toBytes("9"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
(Bytes.toBytes("10"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))))
val conf = HBaseConfiguration.create()
val timeStamp = System.currentTimeMillis()
val hbaseContext = new HBaseContext(sc, conf)
hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
TableName.valueOf(tableName),
(putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2,
timeStamp, putValue._3))
put
})
} finally {
sc.stop()
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.hbasecontext
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Scan
import org.apache.spark.SparkConf
/**
* This is a simple example of scanning records from HBase
* with the hbaseRDD function.
*/
object HBaseDistributedScanExample {
def main(args: Array[String]) {
if (args.length < 1) {
println("GenerateGraphs {tableName}")
return
}
val tableName = args(0)
val sparkConf = new SparkConf().setAppName("HBaseDistributedScanExample " + tableName )
val sc = new SparkContext(sparkConf)
try {
val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
val scan = new Scan()
scan.setCaching(100)
val getRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
getRdd.foreach(v => println(Bytes.toString(v._1.get())))
println("Length: " + getRdd.map(r => r._1.copyBytes()).collect().length);
//.collect().foreach(v => println(Bytes.toString(v._1.get())))
} finally {
sc.stop()
}
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.hbasecontext
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf
/**
* This is a simple example of BulkPut with Spark Streaming
*/
object HBaseStreamingBulkPutExample {
def main(args: Array[String]) {
if (args.length < 4) {
println("HBaseStreamingBulkPutExample " +
"{host} {port} {tableName} {columnFamily}")
return
}
val host = args(0)
val port = args(1)
val tableName = args(2)
val columnFamily = args(3)
val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " +
tableName + " " + columnFamily)
val sc = new SparkContext(sparkConf)
try {
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream(host, port.toInt)
val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
hbaseContext.streamBulkPut[String](lines,
TableName.valueOf(tableName),
(putRecord) => {
if (putRecord.length() > 0) {
val put = new Put(Bytes.toBytes(putRecord))
put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("foo"), Bytes.toBytes("bar"))
put
} else {
null
}
})
ssc.start()
ssc.awaitTerminationOrTimeout(60000)
} finally {
sc.stop()
}
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.rdd
import org.apache.hadoop.hbase.client.Delete
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkContext, SparkConf}
/**
* This is a simple example of deleting records in HBase
* with the bulkDelete function.
*/
object HBaseBulkDeleteExample {
def main(args: Array[String]) {
if (args.length < 1) {
println("HBaseBulkDeletesExample {tableName} ")
return
}
val tableName = args(0)
val sparkConf = new SparkConf().setAppName("HBaseBulkDeleteExample " + tableName)
val sc = new SparkContext(sparkConf)
try {
//[Array[Byte]]
val rdd = sc.parallelize(Array(
Bytes.toBytes("1"),
Bytes.toBytes("2"),
Bytes.toBytes("3"),
Bytes.toBytes("4"),
Bytes.toBytes("5")
))
val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
rdd.hbaseBulkDelete(hbaseContext, TableName.valueOf(tableName),
putRecord => new Delete(putRecord),
4)
} finally {
sc.stop()
}
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.rdd
import org.apache.hadoop.hbase.client.{Result, Get}
import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.spark.{SparkContext, SparkConf}
/**
* This is a simple example of getting records in HBase
* with the bulkGet function.
*/
object HBaseBulkGetExample {
def main(args: Array[String]) {
if (args.length < 1) {
println("HBaseBulkGetExample {tableName}")
return
}
val tableName = args(0)
val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName)
val sc = new SparkContext(sparkConf)
try {
//[(Array[Byte])]
val rdd = sc.parallelize(Array(
Bytes.toBytes("1"),
Bytes.toBytes("2"),
Bytes.toBytes("3"),
Bytes.toBytes("4"),
Bytes.toBytes("5"),
Bytes.toBytes("6"),
Bytes.toBytes("7")))
val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
val getRdd = rdd.hbaseBulkGet[String](hbaseContext, TableName.valueOf(tableName), 2,
record => {
System.out.println("making Get")
new Get(record)
},
(result: Result) => {
val it = result.listCells().iterator()
val b = new StringBuilder
b.append(Bytes.toString(result.getRow) + ":")
while (it.hasNext) {
val cell = it.next()
val q = Bytes.toString(CellUtil.cloneQualifier(cell))
if (q.equals("counter")) {
b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
} else {
b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
}
}
b.toString()
})
getRdd.collect().foreach(v => println(v))
} finally {
sc.stop()
}
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.rdd
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.{SparkConf, SparkContext}
/**
* This is a simple example of putting records in HBase
* with the bulkPut function.
*/
object HBaseBulkPutExample {
def main(args: Array[String]) {
if (args.length < 2) {
println("HBaseBulkPutExample {tableName} {columnFamily}")
return
}
val tableName = args(0)
val columnFamily = args(1)
val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " +
tableName + " " + columnFamily)
val sc = new SparkContext(sparkConf)
try {
//[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])]
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
(Bytes.toBytes("2"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
(Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
(Bytes.toBytes("4"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
(Bytes.toBytes("5"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
))
val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
rdd.hbaseBulkPut(hbaseContext, TableName.valueOf(tableName),
(putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2,
putValue._3))
put
})
} finally {
sc.stop()
}
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.rdd
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkContext, SparkConf}
/**
* This is a simple example of using the foreachPartition
* method with a HBase connection
*/
object HBaseForeachPartitionExample {
def main(args: Array[String]) {
if (args.length < 2) {
println("HBaseBulkPutExample {tableName} {columnFamily}")
return
}
val tableName = args(0)
val columnFamily = args(1)
val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " +
tableName + " " + columnFamily)
val sc = new SparkContext(sparkConf)
try {
//[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])]
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
(Bytes.toBytes("2"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
(Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
(Bytes.toBytes("4"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
(Bytes.toBytes("5"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
))
val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
rdd.hbaseForeachPartition(hbaseContext,
(it, connection) => {
val m = connection.getBufferedMutator(TableName.valueOf(tableName))
it.foreach(r => {
val put = new Put(r._1)
r._2.foreach((putValue) =>
put.addColumn(putValue._1, putValue._2, putValue._3))
m.mutate(put)
})
m.flush()
m.close()
})
} finally {
sc.stop()
}
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.rdd
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkContext, SparkConf}
/**
* This is a simple example of using the mapPartitions
* method with a HBase connection
*/
object HBaseMapPartitionExample {
def main(args: Array[String]) {
if (args.length < 1) {
println("HBaseBulkGetExample {tableName}")
return
}
val tableName = args(0)
val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName)
val sc = new SparkContext(sparkConf)
try {
//[(Array[Byte])]
val rdd = sc.parallelize(Array(
Bytes.toBytes("1"),
Bytes.toBytes("2"),
Bytes.toBytes("3"),
Bytes.toBytes("4"),
Bytes.toBytes("5"),
Bytes.toBytes("6"),
Bytes.toBytes("7")))
val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
val getRdd = rdd.hbaseMapPartitions[String](hbaseContext, (it, connection) => {
val table = connection.getTable(TableName.valueOf(tableName))
it.map{r =>
//batching would be faster. This is just an example
val result = table.get(new Get(r))
val it = result.listCells().iterator()
val b = new StringBuilder
b.append(Bytes.toString(result.getRow) + ":")
while (it.hasNext) {
val cell = it.next()
val q = Bytes.toString(cell.getQualifierArray)
if (q.equals("counter")) {
b.append("(" + q + "," + Bytes.toLong(cell.getValueArray) + ")")
} else {
b.append("(" + q + "," + Bytes.toString(cell.getValueArray) + ")")
}
}
b.toString()
}
})
getRdd.collect().foreach(v => println(v))
} finally {
sc.stop()
}
}
}

View File

@ -0,0 +1,334 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.junit.*;
import scala.Tuple2;
import com.google.common.io.Files;
public class JavaHBaseContextSuite implements Serializable {
private transient JavaSparkContext jsc;
HBaseTestingUtility htu;
protected static final Log LOG = LogFactory.getLog(JavaHBaseContextSuite.class);
byte[] tableName = Bytes.toBytes("t1");
byte[] columnFamily = Bytes.toBytes("c");
String columnFamilyStr = Bytes.toString(columnFamily);
@Before
public void setUp() {
jsc = new JavaSparkContext("local", "JavaHBaseContextSuite");
jsc.addJar("spark.jar");
File tempDir = Files.createTempDir();
tempDir.deleteOnExit();
htu = HBaseTestingUtility.createLocalHTU();
try {
LOG.info("cleaning up test dir");
htu.cleanupTestDir();
LOG.info("starting minicluster");
htu.startMiniZKCluster();
htu.startMiniHBaseCluster(1, 1);
LOG.info(" - minicluster started");
try {
htu.deleteTable(TableName.valueOf(tableName));
} catch (Exception e) {
LOG.info(" - no table " + Bytes.toString(tableName) + " found");
}
LOG.info(" - creating table " + Bytes.toString(tableName));
htu.createTable(TableName.valueOf(tableName),
columnFamily);
LOG.info(" - created table");
} catch (Exception e1) {
throw new RuntimeException(e1);
}
}
@After
public void tearDown() {
try {
htu.deleteTable(TableName.valueOf(tableName));
LOG.info("shuting down minicluster");
htu.shutdownMiniHBaseCluster();
htu.shutdownMiniZKCluster();
LOG.info(" - minicluster shut down");
htu.cleanupTestDir();
} catch (Exception e) {
throw new RuntimeException(e);
}
jsc.stop();
jsc = null;
}
@Test
public void testBulkPut() throws IOException {
List<String> list = new ArrayList<>();
list.add("1," + columnFamilyStr + ",a,1");
list.add("2," + columnFamilyStr + ",a,2");
list.add("3," + columnFamilyStr + ",a,3");
list.add("4," + columnFamilyStr + ",a,4");
list.add("5," + columnFamilyStr + ",a,5");
JavaRDD<String> rdd = jsc.parallelize(list);
Configuration conf = htu.getConfiguration();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(TableName.valueOf(tableName));
try {
List<Delete> deletes = new ArrayList<>();
for (int i = 1; i < 6; i++) {
deletes.add(new Delete(Bytes.toBytes(Integer.toString(i))));
}
table.delete(deletes);
} finally {
table.close();
}
hbaseContext.bulkPut(rdd,
TableName.valueOf(tableName),
new PutFunction());
table = conn.getTable(TableName.valueOf(tableName));
try {
Result result1 = table.get(new Get(Bytes.toBytes("1")));
Assert.assertNotNull("Row 1 should had been deleted", result1.getRow());
Result result2 = table.get(new Get(Bytes.toBytes("2")));
Assert.assertNotNull("Row 2 should had been deleted", result2.getRow());
Result result3 = table.get(new Get(Bytes.toBytes("3")));
Assert.assertNotNull("Row 3 should had been deleted", result3.getRow());
Result result4 = table.get(new Get(Bytes.toBytes("4")));
Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
Result result5 = table.get(new Get(Bytes.toBytes("5")));
Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
} finally {
table.close();
conn.close();
}
}
public static class PutFunction implements Function<String, Put> {
private static final long serialVersionUID = 1L;
public Put call(String v) throws Exception {
String[] cells = v.split(",");
Put put = new Put(Bytes.toBytes(cells[0]));
put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
Bytes.toBytes(cells[3]));
return put;
}
}
@Test
public void testBulkDelete() throws IOException {
List<byte[]> list = new ArrayList<>();
list.add(Bytes.toBytes("1"));
list.add(Bytes.toBytes("2"));
list.add(Bytes.toBytes("3"));
JavaRDD<byte[]> rdd = jsc.parallelize(list);
Configuration conf = htu.getConfiguration();
populateTableWithMockData(conf, TableName.valueOf(tableName));
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.bulkDelete(rdd, TableName.valueOf(tableName),
new JavaHBaseBulkDeleteExample.DeleteFunction(), 2);
try (
Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(TableName.valueOf(tableName))
){
Result result1 = table.get(new Get(Bytes.toBytes("1")));
Assert.assertNull("Row 1 should had been deleted", result1.getRow());
Result result2 = table.get(new Get(Bytes.toBytes("2")));
Assert.assertNull("Row 2 should had been deleted", result2.getRow());
Result result3 = table.get(new Get(Bytes.toBytes("3")));
Assert.assertNull("Row 3 should had been deleted", result3.getRow());
Result result4 = table.get(new Get(Bytes.toBytes("4")));
Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
Result result5 = table.get(new Get(Bytes.toBytes("5")));
Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
}
}
@Test
public void testDistributedScan() throws IOException {
Configuration conf = htu.getConfiguration();
populateTableWithMockData(conf, TableName.valueOf(tableName));
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
Scan scan = new Scan();
scan.setCaching(100);
JavaRDD<String> javaRdd =
hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
.map(new ScanConvertFunction());
List<String> results = javaRdd.collect();
Assert.assertEquals(results.size(), 5);
}
private static class ScanConvertFunction implements
Function<Tuple2<ImmutableBytesWritable, Result>, String> {
@Override
public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
return Bytes.toString(v1._1().copyBytes());
}
}
@Test
public void testBulkGet() throws IOException {
List<byte[]> list = new ArrayList<>();
list.add(Bytes.toBytes("1"));
list.add(Bytes.toBytes("2"));
list.add(Bytes.toBytes("3"));
list.add(Bytes.toBytes("4"));
list.add(Bytes.toBytes("5"));
JavaRDD<byte[]> rdd = jsc.parallelize(list);
Configuration conf = htu.getConfiguration();
populateTableWithMockData(conf, TableName.valueOf(tableName));
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
final JavaRDD<String> stringJavaRDD =
hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd,
new GetFunction(),
new ResultFunction());
Assert.assertEquals(stringJavaRDD.count(), 5);
}
public static class GetFunction implements Function<byte[], Get> {
private static final long serialVersionUID = 1L;
public Get call(byte[] v) throws Exception {
return new Get(v);
}
}
public static class ResultFunction implements Function<Result, String> {
private static final long serialVersionUID = 1L;
public String call(Result result) throws Exception {
Iterator<Cell> it = result.listCells().iterator();
StringBuilder b = new StringBuilder();
b.append(Bytes.toString(result.getRow())).append(":");
while (it.hasNext()) {
Cell cell = it.next();
String q = Bytes.toString(CellUtil.cloneQualifier(cell));
if ("counter".equals(q)) {
b.append("(")
.append(q)
.append(",")
.append(Bytes.toLong(CellUtil.cloneValue(cell)))
.append(")");
} else {
b.append("(")
.append(q)
.append(",")
.append(Bytes.toString(CellUtil.cloneValue(cell)))
.append(")");
}
}
return b.toString();
}
}
private void populateTableWithMockData(Configuration conf, TableName tableName)
throws IOException {
try (
Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(tableName)) {
List<Put> puts = new ArrayList<>();
for (int i = 1; i < 6; i++) {
Put put = new Put(Bytes.toBytes(Integer.toString(i)));
put.addColumn(columnFamily, columnFamily, columnFamily);
puts.add(put);
}
table.put(puts);
}
}
}

View File

@ -0,0 +1,344 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{ CellUtil, TableName, HBaseTestingUtility}
import org.apache.spark.{SparkException, Logging, SparkContext}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
class HBaseContextSuite extends FunSuite with
BeforeAndAfterEach with BeforeAndAfterAll with Logging {
@transient var sc: SparkContext = null
var TEST_UTIL = new HBaseTestingUtility
val tableName = "t1"
val columnFamily = "c"
override def beforeAll() {
TEST_UTIL.startMiniCluster()
logInfo(" - minicluster started")
try {
TEST_UTIL.deleteTable(TableName.valueOf(tableName))
} catch {
case e: Exception =>
logInfo(" - no table " + tableName + " found")
}
logInfo(" - creating table " + tableName)
TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")
val envMap = Map[String,String](("Xmx", "512m"))
sc = new SparkContext("local", "test", null, Nil, envMap)
}
override def afterAll() {
logInfo("shuting down minicluster")
TEST_UTIL.shutdownMiniCluster()
logInfo(" - minicluster shut down")
TEST_UTIL.cleanupTestDir()
sc.stop()
}
test("bulkput to test HBase client") {
val config = TEST_UTIL.getConfiguration
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
(Bytes.toBytes("2"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))),
(Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3")))),
(Bytes.toBytes("4"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))),
(Bytes.toBytes("5"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar"))))))
val hbaseContext = new HBaseContext(sc, config)
hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
TableName.valueOf(tableName),
(putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
put
})
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a"))))
assert(foo1 == "foo1")
val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b"))))
assert(foo2 == "foo2")
val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c"))))
assert(foo3 == "foo3")
val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d"))))
assert(foo4 == "foo")
val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e"))))
assert(foo5 == "bar")
} finally {
table.close()
connection.close()
}
}
test("bulkDelete to test HBase client") {
val config = TEST_UTIL.getConfiguration
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
var put = new Put(Bytes.toBytes("delete1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
table.put(put)
put = new Put(Bytes.toBytes("delete2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
table.put(put)
put = new Put(Bytes.toBytes("delete3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
val rdd = sc.parallelize(Array(
Bytes.toBytes("delete1"),
Bytes.toBytes("delete3")))
val hbaseContext = new HBaseContext(sc, config)
hbaseContext.bulkDelete[Array[Byte]](rdd,
TableName.valueOf(tableName),
putRecord => new Delete(putRecord),
4)
assert(table.get(new Get(Bytes.toBytes("delete1"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null)
assert(table.get(new Get(Bytes.toBytes("delete3"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null)
assert(Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("delete2"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))).equals("foo2"))
} finally {
table.close()
connection.close()
}
}
test("bulkGet to test HBase client") {
val config = TEST_UTIL.getConfiguration
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
var put = new Put(Bytes.toBytes("get1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
table.put(put)
put = new Put(Bytes.toBytes("get2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
table.put(put)
put = new Put(Bytes.toBytes("get3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
} finally {
table.close()
connection.close()
}
val rdd = sc.parallelize(Array(
Bytes.toBytes("get1"),
Bytes.toBytes("get2"),
Bytes.toBytes("get3"),
Bytes.toBytes("get4")))
val hbaseContext = new HBaseContext(sc, config)
val getRdd = hbaseContext.bulkGet[Array[Byte], String](
TableName.valueOf(tableName),
2,
rdd,
record => {
new Get(record)
},
(result: Result) => {
if (result.listCells() != null) {
val it = result.listCells().iterator()
val B = new StringBuilder
B.append(Bytes.toString(result.getRow) + ":")
while (it.hasNext) {
val cell = it.next()
val q = Bytes.toString(CellUtil.cloneQualifier(cell))
if (q.equals("counter")) {
B.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
} else {
B.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
}
}
"" + B.toString
} else {
""
}
})
val getArray = getRdd.collect()
assert(getArray.length == 4)
assert(getArray.contains("get1:(a,foo1)"))
assert(getArray.contains("get2:(a,foo2)"))
assert(getArray.contains("get3:(a,foo3)"))
}
test("BulkGet failure test: bad table") {
val config = TEST_UTIL.getConfiguration
val rdd = sc.parallelize(Array(
Bytes.toBytes("get1"),
Bytes.toBytes("get2"),
Bytes.toBytes("get3"),
Bytes.toBytes("get4")))
val hbaseContext = new HBaseContext(sc, config)
intercept[SparkException] {
try {
val getRdd = hbaseContext.bulkGet[Array[Byte], String](
TableName.valueOf("badTableName"),
2,
rdd,
record => {
new Get(record)
},
(result: Result) => "1")
getRdd.collect()
fail("We should have failed and not reached this line")
} catch {
case ex: SparkException => {
assert(
ex.getMessage.contains(
"org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException"))
throw ex
}
}
}
}
test("BulkGet failure test: bad column") {
val config = TEST_UTIL.getConfiguration
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
var put = new Put(Bytes.toBytes("get1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
table.put(put)
put = new Put(Bytes.toBytes("get2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
table.put(put)
put = new Put(Bytes.toBytes("get3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
} finally {
table.close()
connection.close()
}
val rdd = sc.parallelize(Array(
Bytes.toBytes("get1"),
Bytes.toBytes("get2"),
Bytes.toBytes("get3"),
Bytes.toBytes("get4")))
val hbaseContext = new HBaseContext(sc, config)
val getRdd = hbaseContext.bulkGet[Array[Byte], String](
TableName.valueOf(tableName),
2,
rdd,
record => {
new Get(record)
},
(result: Result) => {
if (result.listCells() != null) {
val cellValue = result.getColumnLatestCell(
Bytes.toBytes("c"), Bytes.toBytes("bad_column"))
if (cellValue == null) "null" else "bad"
} else "noValue"
})
var nullCounter = 0
var noValueCounter = 0
getRdd.collect().foreach(r => {
if ("null".equals(r)) nullCounter += 1
else if ("noValue".equals(r)) noValueCounter += 1
})
assert(nullCounter == 3)
assert(noValueCounter == 1)
}
test("distributedScan to test HBase client") {
val config = TEST_UTIL.getConfiguration
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
var put = new Put(Bytes.toBytes("scan1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
table.put(put)
put = new Put(Bytes.toBytes("scan2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
table.put(put)
put = new Put(Bytes.toBytes("scan3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
put = new Put(Bytes.toBytes("scan4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
put = new Put(Bytes.toBytes("scan5"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
} finally {
table.close()
connection.close()
}
val hbaseContext = new HBaseContext(sc, config)
val scan = new Scan()
scan.setCaching(100)
scan.setStartRow(Bytes.toBytes("scan2"))
scan.setStopRow(Bytes.toBytes("scan4_"))
val scanRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
try {
val scanList = scanRdd.map(r => r._1.copyBytes()).collect()
assert(scanList.length == 3)
} catch {
case ex: Exception => ex.printStackTrace()
}
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseTestingUtility}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.{SparkContext, Logging}
import org.apache.hadoop.hbase.spark.HBaseDStreamFunctions._
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import scala.collection.mutable
class HBaseDStreamFunctionsSuite extends FunSuite with
BeforeAndAfterEach with BeforeAndAfterAll with Logging {
@transient var sc: SparkContext = null
var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility
val tableName = "t1"
val columnFamily = "c"
override def beforeAll() {
TEST_UTIL.startMiniCluster()
logInfo(" - minicluster started")
try
TEST_UTIL.deleteTable(TableName.valueOf(tableName))
catch {
case e: Exception => logInfo(" - no table " + tableName + " found")
}
logInfo(" - creating table " + tableName)
TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")
sc = new SparkContext("local", "test")
}
override def afterAll() {
TEST_UTIL.deleteTable(TableName.valueOf(tableName))
TEST_UTIL.shutdownMiniCluster()
sc.stop()
}
test("bulkput to test HBase client") {
val config = TEST_UTIL.getConfiguration
val rdd1 = sc.parallelize(Array(
(Bytes.toBytes("1"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
(Bytes.toBytes("2"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))),
(Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3"))))))
val rdd2 = sc.parallelize(Array(
(Bytes.toBytes("4"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))),
(Bytes.toBytes("5"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar"))))))
val hbaseContext = new HBaseContext(sc, config)
val ssc = new StreamingContext(sc, Milliseconds(200))
val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte],
Array[Byte], Array[Byte])])]]()
queue += rdd1
queue += rdd2
val dStream = ssc.queueStream(queue)
dStream.hbaseBulkPut(
hbaseContext,
TableName.valueOf(tableName),
(putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
put
})
ssc.start()
ssc.awaitTerminationOrTimeout(1000)
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a"))))
assert(foo1 == "foo1")
val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b"))))
assert(foo2 == "foo2")
val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c"))))
assert(foo3 == "foo3")
val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d"))))
assert(foo4 == "foo")
val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e"))))
assert(foo5 == "bar")
} finally {
table.close()
connection.close()
}
}
}

View File

@ -0,0 +1,398 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseTestingUtility}
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.spark.{Logging, SparkContext}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import scala.collection.mutable
class HBaseRDDFunctionsSuite extends FunSuite with
BeforeAndAfterEach with BeforeAndAfterAll with Logging {
@transient var sc: SparkContext = null
var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility
val tableName = "t1"
val columnFamily = "c"
override def beforeAll() {
TEST_UTIL.startMiniCluster
logInfo(" - minicluster started")
try
TEST_UTIL.deleteTable(TableName.valueOf(tableName))
catch {
case e: Exception => logInfo(" - no table " + tableName + " found")
}
logInfo(" - creating table " + tableName)
TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")
sc = new SparkContext("local", "test")
}
override def afterAll() {
TEST_UTIL.deleteTable(TableName.valueOf(tableName))
logInfo("shuting down minicluster")
TEST_UTIL.shutdownMiniCluster()
sc.stop()
}
test("bulkput to test HBase client") {
val config = TEST_UTIL.getConfiguration
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
(Bytes.toBytes("2"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))),
(Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3")))),
(Bytes.toBytes("4"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))),
(Bytes.toBytes("5"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar"))))))
val hbaseContext = new HBaseContext(sc, config)
rdd.hbaseBulkPut(
hbaseContext,
TableName.valueOf(tableName),
(putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
put
})
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a"))))
assert(foo1 == "foo1")
val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b"))))
assert(foo2 == "foo2")
val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c"))))
assert(foo3 == "foo3")
val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d"))))
assert(foo4 == "foo")
val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e"))))
assert(foo5 == "bar")
} finally {
table.close()
connection.close()
}
}
test("bulkDelete to test HBase client") {
val config = TEST_UTIL.getConfiguration
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
var put = new Put(Bytes.toBytes("delete1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
table.put(put)
put = new Put(Bytes.toBytes("delete2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
table.put(put)
put = new Put(Bytes.toBytes("delete3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
val rdd = sc.parallelize(Array(
Bytes.toBytes("delete1"),
Bytes.toBytes("delete3")))
val hbaseContext = new HBaseContext(sc, config)
rdd.hbaseBulkDelete(hbaseContext,
TableName.valueOf(tableName),
putRecord => new Delete(putRecord),
4)
assert(table.get(new Get(Bytes.toBytes("delete1"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null)
assert(table.get(new Get(Bytes.toBytes("delete3"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null)
assert(Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("delete2"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))).equals("foo2"))
} finally {
table.close()
connection.close()
}
}
test("bulkGet to test HBase client") {
val config = TEST_UTIL.getConfiguration
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
var put = new Put(Bytes.toBytes("get1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
table.put(put)
put = new Put(Bytes.toBytes("get2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
table.put(put)
put = new Put(Bytes.toBytes("get3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
} finally {
table.close()
connection.close()
}
val rdd = sc.parallelize(Array(
Bytes.toBytes("get1"),
Bytes.toBytes("get2"),
Bytes.toBytes("get3"),
Bytes.toBytes("get4")))
val hbaseContext = new HBaseContext(sc, config)
//Get with custom convert logic
val getRdd = rdd.hbaseBulkGet[String](hbaseContext, TableName.valueOf(tableName), 2,
record => {
new Get(record)
},
(result: Result) => {
if (result.listCells() != null) {
val it = result.listCells().iterator()
val B = new StringBuilder
B.append(Bytes.toString(result.getRow) + ":")
while (it.hasNext) {
val cell = it.next
val q = Bytes.toString(CellUtil.cloneQualifier(cell))
if (q.equals("counter")) {
B.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
} else {
B.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
}
}
"" + B.toString
} else {
""
}
})
val getArray = getRdd.collect()
assert(getArray.length == 4)
assert(getArray.contains("get1:(a,foo1)"))
assert(getArray.contains("get2:(a,foo2)"))
assert(getArray.contains("get3:(a,foo3)"))
}
test("bulkGet default converter to test HBase client") {
val config = TEST_UTIL.getConfiguration
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
var put = new Put(Bytes.toBytes("get1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
table.put(put)
put = new Put(Bytes.toBytes("get2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
table.put(put)
put = new Put(Bytes.toBytes("get3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
} finally {
table.close()
connection.close()
}
val rdd = sc.parallelize(Array(
Bytes.toBytes("get1"),
Bytes.toBytes("get2"),
Bytes.toBytes("get3"),
Bytes.toBytes("get4")))
val hbaseContext = new HBaseContext(sc, config)
val getRdd = rdd.hbaseBulkGet(hbaseContext, TableName.valueOf("t1"), 2,
record => {
new Get(record)
}).map((row) => {
if (row != null && row._2.listCells() != null) {
val it = row._2.listCells().iterator()
val B = new StringBuilder
B.append(Bytes.toString(row._2.getRow) + ":")
while (it.hasNext) {
val cell = it.next
val q = Bytes.toString(CellUtil.cloneQualifier(cell))
if (q.equals("counter")) {
B.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
} else {
B.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
}
}
"" + B.toString
} else {
""
}})
val getArray = getRdd.collect()
assert(getArray.length == 4)
assert(getArray.contains("get1:(a,foo1)"))
assert(getArray.contains("get2:(a,foo2)"))
assert(getArray.contains("get3:(a,foo3)"))
}
test("foreachPartition with puts to test HBase client") {
val config = TEST_UTIL.getConfiguration
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1foreach"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
(Bytes.toBytes("2foreach"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))),
(Bytes.toBytes("3foreach"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3")))),
(Bytes.toBytes("4foreach"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))),
(Bytes.toBytes("5foreach"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar"))))))
val hbaseContext = new HBaseContext(sc, config)
rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
it.foreach((putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
bufferedMutator.mutate(put)
})
bufferedMutator.flush()
bufferedMutator.close()
})
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1foreach"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a"))))
assert(foo1 == "foo1")
val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2foreach"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b"))))
assert(foo2 == "foo2")
val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3foreach"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c"))))
assert(foo3 == "foo3")
val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4foreach"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d"))))
assert(foo4 == "foo")
val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e"))))
assert(foo5 == "bar")
} finally {
table.close()
connection.close()
}
}
test("mapPartitions with Get from test HBase client") {
val config = TEST_UTIL.getConfiguration
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
var put = new Put(Bytes.toBytes("get1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
table.put(put)
put = new Put(Bytes.toBytes("get2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
table.put(put)
put = new Put(Bytes.toBytes("get3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
} finally {
table.close()
connection.close()
}
val rdd = sc.parallelize(Array(
Bytes.toBytes("get1"),
Bytes.toBytes("get2"),
Bytes.toBytes("get3"),
Bytes.toBytes("get4")))
val hbaseContext = new HBaseContext(sc, config)
//Get with custom convert logic
val getRdd = rdd.hbaseMapPartitions(hbaseContext, (it, conn) => {
val table = conn.getTable(TableName.valueOf("t1"))
var res = mutable.MutableList[String]()
it.foreach(r => {
val get = new Get(r)
val result = table.get(get)
if (result.listCells != null) {
val it = result.listCells().iterator()
val B = new StringBuilder
B.append(Bytes.toString(result.getRow) + ":")
while (it.hasNext) {
val cell = it.next()
val q = Bytes.toString(CellUtil.cloneQualifier(cell))
if (q.equals("counter")) {
B.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
} else {
B.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
}
}
res += "" + B.toString
} else {
res += ""
}
})
res.iterator
})
val getArray = getRdd.collect()
assert(getArray.length == 4)
assert(getArray.contains("get1:(a,foo1)"))
assert(getArray.contains("get2:(a,foo2)"))
assert(getArray.contains("get3:(a,foo3)"))
}
}

View File

@ -66,6 +66,7 @@
<module>hbase-rest</module>
<module>hbase-checkstyle</module>
<module>hbase-shaded</module>
<module>hbase-spark</module>
</modules>
<!--Add apache snapshots in case we want to use unreleased versions of plugins:
e.g. surefire 2.18-SNAPSHOT-->