HBASE-14184 Fix indention and type-o in JavaHBaseContext (Ted Malaska)

This commit is contained in:
tedyu 2015-08-07 11:02:24 -07:00
parent f06daaf010
commit e53d2481ee
1 changed files with 288 additions and 293 deletions

View File

@ -17,22 +17,15 @@
package org.apache.hadoop.hbase.spark package org.apache.hadoop.hbase.spark
import org.apache.hadoop.hbase.TableName
import org.apache.spark.api.java.JavaSparkContext
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.spark.api.java.JavaRDD import org.apache.hadoop.hbase.TableName
import org.apache.spark.api.java.function.VoidFunction import org.apache.hadoop.hbase.client.{Connection, Delete, Get, Put, Result, Scan}
import org.apache.spark.api.java.function.Function
import org.apache.hadoop.hbase.client.Connection
import org.apache.spark.streaming.api.java.JavaDStream
import org.apache.spark.api.java.function.FlatMapFunction
import scala.collection.JavaConversions._
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Delete
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{FlatMapFunction, Function, VoidFunction}
import org.apache.spark.streaming.api.java.JavaDStream
import scala.collection.JavaConversions._
import scala.reflect.ClassTag import scala.reflect.ClassTag
/** /**
@ -44,304 +37,306 @@ import scala.reflect.ClassTag
* @param config This is the config information to out HBase cluster * @param config This is the config information to out HBase cluster
*/ */
class JavaHBaseContext(@transient jsc: JavaSparkContext, class JavaHBaseContext(@transient jsc: JavaSparkContext,
@transient config: Configuration) extends Serializable { @transient config: Configuration) extends Serializable {
val hbaseContext = new HBaseContext(jsc.sc, config) val hbaseContext = new HBaseContext(jsc.sc, config)
/** /**
* A simple enrichment of the traditional Spark javaRdd foreachPartition. * A simple enrichment of the traditional Spark javaRdd foreachPartition.
* This function differs from the original in that it offers the * This function differs from the original in that it offers the
* developer access to a already connected HConnection object * developer access to a already connected HConnection object
* *
* Note: Do not close the HConnection object. All HConnection * Note: Do not close the HConnection object. All HConnection
* management is handled outside this method * management is handled outside this method
* *
* @param javaRdd Original javaRdd with data to iterate over * @param javaRdd Original javaRdd with data to iterate over
* @param f Function to be given a iterator to iterate through * @param f Function to be given a iterator to iterate through
* the RDD values and a HConnection object to interact * the RDD values and a HConnection object to interact
* with HBase * with HBase
*/ */
def foreachPartition[T](javaRdd: JavaRDD[T], def foreachPartition[T](javaRdd: JavaRDD[T],
f: VoidFunction[(java.util.Iterator[T], Connection)] ) = { f: VoidFunction[(java.util.Iterator[T], Connection)]) = {
hbaseContext.foreachPartition(javaRdd.rdd, hbaseContext.foreachPartition(javaRdd.rdd,
(it:Iterator[T], conn:Connection) => (it: Iterator[T], conn: Connection) => {
{ f.call((it, conn)) }) f.call((it, conn))
} })
}
/** /**
* A simple enrichment of the traditional Spark Streaming dStream foreach * A simple enrichment of the traditional Spark Streaming dStream foreach
* This function differs from the original in that it offers the * This function differs from the original in that it offers the
* developer access to a already connected HConnection object * developer access to a already connected HConnection object
* *
* Note: Do not close the HConnection object. All HConnection * Note: Do not close the HConnection object. All HConnection
* management is handled outside this method * management is handled outside this method
* *
* @param javaDstream Original DStream with data to iterate over * @param javaDstream Original DStream with data to iterate over
* @param f Function to be given a iterator to iterate through * @param f Function to be given a iterator to iterate through
* the JavaDStream values and a HConnection object to * the JavaDStream values and a HConnection object to
* interact with HBase * interact with HBase
*/ */
def foreachPartition[T](javaDstream: JavaDStream[T], def foreachPartition[T](javaDstream: JavaDStream[T],
f: VoidFunction[(Iterator[T], Connection)]) = { f: VoidFunction[(Iterator[T], Connection)]) = {
hbaseContext.foreachPartition(javaDstream.dstream, hbaseContext.foreachPartition(javaDstream.dstream,
(it:Iterator[T], conn: Connection) => f.call(it, conn)) (it: Iterator[T], conn: Connection) => f.call(it, conn))
} }
/** /**
* A simple enrichment of the traditional Spark JavaRDD mapPartition. * A simple enrichment of the traditional Spark JavaRDD mapPartition.
* This function differs from the original in that it offers the * This function differs from the original in that it offers the
* developer access to a already connected HConnection object * developer access to a already connected HConnection object
* *
* Note: Do not close the HConnection object. All HConnection * Note: Do not close the HConnection object. All HConnection
* management is handled outside this method * management is handled outside this method
* *
* Note: Make sure to partition correctly to avoid memory issue when * Note: Make sure to partition correctly to avoid memory issue when
* getting data from HBase * getting data from HBase
* *
* @param javaRdd Original JavaRdd with data to iterate over * @param javaRdd Original JavaRdd with data to iterate over
* @param f Function to be given a iterator to iterate through * @param f Function to be given a iterator to iterate through
* the RDD values and a HConnection object to interact * the RDD values and a HConnection object to interact
* with HBase * with HBase
* @return Returns a new RDD generated by the user definition * @return Returns a new RDD generated by the user definition
* function just like normal mapPartition * function just like normal mapPartition
*/ */
def mapPartitions[T,R](javaRdd: JavaRDD[T], def mapPartitions[T, R](javaRdd: JavaRDD[T],
f: FlatMapFunction[(java.util.Iterator[T], f: FlatMapFunction[(java.util.Iterator[T],
Connection),R] ): JavaRDD[R] = { Connection), R]): JavaRDD[R] = {
def fn = (it: Iterator[T], conn: Connection) => def fn = (it: Iterator[T], conn: Connection) =>
asScalaIterator( asScalaIterator(
f.call((asJavaIterator(it), conn)).iterator() f.call((asJavaIterator(it), conn)).iterator()
) )
JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd, JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd,
(iterator:Iterator[T], connection:Connection) => (iterator: Iterator[T], connection: Connection) =>
fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R]) fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R])
} }
/** /**
* A simple enrichment of the traditional Spark Streaming JavaDStream * A simple enrichment of the traditional Spark Streaming JavaDStream
* mapPartition. * mapPartition.
* *
* This function differs from the original in that it offers the * This function differs from the original in that it offers the
* developer access to a already connected HConnection object * developer access to a already connected HConnection object
* *
* Note: Do not close the HConnection object. All HConnection * Note: Do not close the HConnection object. All HConnection
* management is handled outside this method * management is handled outside this method
* *
* Note: Make sure to partition correctly to avoid memory issue when * Note: Make sure to partition correctly to avoid memory issue when
* getting data from HBase * getting data from HBase
* *
* @param javaDstream Original JavaDStream with data to iterate over * @param javaDstream Original JavaDStream with data to iterate over
* @param mp Function to be given a iterator to iterate through * @param mp Function to be given a iterator to iterate through
* the JavaDStream values and a HConnection object to * the JavaDStream values and a HConnection object to
* interact with HBase * interact with HBase
* @return Returns a new JavaDStream generated by the user * @return Returns a new JavaDStream generated by the user
* definition function just like normal mapPartition * definition function just like normal mapPartition
*/ */
def streamMap[T, U](javaDstream: JavaDStream[T], def streamMap[T, U](javaDstream: JavaDStream[T],
mp: Function[(Iterator[T], Connection), Iterator[U]]): mp: Function[(Iterator[T], Connection), Iterator[U]]):
JavaDStream[U] = { JavaDStream[U] = {
JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream, JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream,
(it: Iterator[T], conn: Connection) => (it: Iterator[T], conn: Connection) =>
mp.call(it, conn) )(fakeClassTag[U]))(fakeClassTag[U]) mp.call(it, conn))(fakeClassTag[U]))(fakeClassTag[U])
} }
/** /**
* A simple abstraction over the HBaseContext.foreachPartition method. * A simple abstraction over the HBaseContext.foreachPartition method.
* *
* It allow addition support for a user to take JavaRDD * It allow addition support for a user to take JavaRDD
* and generate puts and send them to HBase. * and generate puts and send them to HBase.
* The complexity of managing the HConnection is * The complexity of managing the HConnection is
* removed from the developer * removed from the developer
* *
* @param javaDdd Original JavaRDD with data to iterate over * @param javaRdd Original JavaRDD with data to iterate over
* @param tableName The name of the table to put into * @param tableName The name of the table to put into
* @param f Function to convert a value in the JavaRDD * @param f Function to convert a value in the JavaRDD
* to a HBase Put * to a HBase Put
*/ */
def bulkPut[T](javaDdd: JavaRDD[T], def bulkPut[T](javaRdd: JavaRDD[T],
tableName: TableName, tableName: TableName,
f: Function[(T), Put]) { f: Function[(T), Put]) {
hbaseContext.bulkPut(javaDdd.rdd, tableName, (t:T) => f.call(t)) hbaseContext.bulkPut(javaRdd.rdd, tableName, (t: T) => f.call(t))
} }
/** /**
* A simple abstraction over the HBaseContext.streamMapPartition method. * A simple abstraction over the HBaseContext.streamMapPartition method.
* *
* It allow addition support for a user to take a JavaDStream and * It allow addition support for a user to take a JavaDStream and
* generate puts and send them to HBase. * generate puts and send them to HBase.
* *
* The complexity of managing the HConnection is * The complexity of managing the HConnection is
* removed from the developer * removed from the developer
* *
* @param javaDstream Original DStream with data to iterate over * @param javaDstream Original DStream with data to iterate over
* @param tableName The name of the table to put into * @param tableName The name of the table to put into
* @param f Function to convert a value in * @param f Function to convert a value in
* the JavaDStream to a HBase Put * the JavaDStream to a HBase Put
*/ */
def streamBulkPut[T](javaDstream: JavaDStream[T], def streamBulkPut[T](javaDstream: JavaDStream[T],
tableName: TableName, tableName: TableName,
f: Function[T,Put]) = { f: Function[T, Put]) = {
hbaseContext.streamBulkPut(javaDstream.dstream, hbaseContext.streamBulkPut(javaDstream.dstream,
tableName, tableName,
(t:T) => f.call(t)) (t: T) => f.call(t))
} }
/** /**
* A simple abstraction over the HBaseContext.foreachPartition method. * A simple abstraction over the HBaseContext.foreachPartition method.
* *
* It allow addition support for a user to take a JavaRDD and * It allow addition support for a user to take a JavaRDD and
* generate delete and send them to HBase. * generate delete and send them to HBase.
* *
* The complexity of managing the HConnection is * The complexity of managing the HConnection is
* removed from the developer * removed from the developer
* *
* @param javaRdd Original JavaRDD with data to iterate over * @param javaRdd Original JavaRDD with data to iterate over
* @param tableName The name of the table to delete from * @param tableName The name of the table to delete from
* @param f Function to convert a value in the JavaRDD to a * @param f Function to convert a value in the JavaRDD to a
* HBase Deletes * HBase Deletes
* @param batchSize The number of deletes to batch before sending to HBase * @param batchSize The number of deletes to batch before sending to HBase
*/ */
def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName, def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName,
f: Function[T, Delete], batchSize:Integer) { f: Function[T, Delete], batchSize: Integer) {
hbaseContext.bulkDelete(javaRdd.rdd, tableName, (t:T) => f.call(t), batchSize) hbaseContext.bulkDelete(javaRdd.rdd, tableName, (t: T) => f.call(t), batchSize)
} }
/** /**
* A simple abstraction over the HBaseContext.streamBulkMutation method. * A simple abstraction over the HBaseContext.streamBulkMutation method.
* *
* It allow addition support for a user to take a JavaDStream and * It allow addition support for a user to take a JavaDStream and
* generate Delete and send them to HBase. * generate Delete and send them to HBase.
* *
* The complexity of managing the HConnection is * The complexity of managing the HConnection is
* removed from the developer * removed from the developer
* *
* @param javaDstream Original DStream with data to iterate over * @param javaDStream Original DStream with data to iterate over
* @param tableName The name of the table to delete from * @param tableName The name of the table to delete from
* @param f Function to convert a value in the JavaDStream to a * @param f Function to convert a value in the JavaDStream to a
* HBase Delete * HBase Delete
* @param batchSize The number of deletes to be sent at once * @param batchSize The number of deletes to be sent at once
*/ */
def streamBulkDelete[T](javaDstream: JavaDStream[T], def streamBulkDelete[T](javaDStream: JavaDStream[T],
tableName: TableName, tableName: TableName,
f: Function[T, Delete], f: Function[T, Delete],
batchSize: Integer) = { batchSize: Integer) = {
hbaseContext.streamBulkDelete(javaDstream.dstream, tableName, hbaseContext.streamBulkDelete(javaDStream.dstream, tableName,
(t:T) => f.call(t), (t: T) => f.call(t),
batchSize) batchSize)
} }
/** /**
* A simple abstraction over the HBaseContext.mapPartition method. * A simple abstraction over the HBaseContext.mapPartition method.
* *
* It allow addition support for a user to take a JavaRDD and generates a * It allow addition support for a user to take a JavaRDD and generates a
* new RDD based on Gets and the results they bring back from HBase * new RDD based on Gets and the results they bring back from HBase
* *
* @param tableName The name of the table to get from * @param tableName The name of the table to get from
* @param batchSize batch size of how many gets to retrieve in a single fetch * @param batchSize batch size of how many gets to retrieve in a single fetch
* @param javaRdd Original JavaRDD with data to iterate over * @param javaRdd Original JavaRDD with data to iterate over
* @param makeGet Function to convert a value in the JavaRDD to a * @param makeGet Function to convert a value in the JavaRDD to a
* HBase Get * HBase Get
* @param convertResult This will convert the HBase Result object to * @param convertResult This will convert the HBase Result object to
* what ever the user wants to put in the resulting * what ever the user wants to put in the resulting
* JavaRDD * JavaRDD
* return new JavaRDD that is created by the Get to HBase * @return New JavaRDD that is created by the Get to HBase
*/ */
def bulkGet[T, U](tableName: TableName, def bulkGet[T, U](tableName: TableName,
batchSize:Integer, batchSize: Integer,
javaRdd: JavaRDD[T], javaRdd: JavaRDD[T],
makeGet: Function[T, Get], makeGet: Function[T, Get],
convertResult: Function[Result, U]): JavaRDD[U] = { convertResult: Function[Result, U]): JavaRDD[U] = {
JavaRDD.fromRDD(hbaseContext.bulkGet[T, U](tableName, JavaRDD.fromRDD(hbaseContext.bulkGet[T, U](tableName,
batchSize, batchSize,
javaRdd.rdd, javaRdd.rdd,
(t:T) => makeGet.call(t), (t: T) => makeGet.call(t),
(r:Result) => {convertResult.call(r)})(fakeClassTag[U]))(fakeClassTag[U]) (r: Result) => {
convertResult.call(r)
})(fakeClassTag[U]))(fakeClassTag[U])
} }
/** /**
* A simple abstraction over the HBaseContext.streamMap method. * A simple abstraction over the HBaseContext.streamMap method.
* *
* It allow addition support for a user to take a DStream and * It allow addition support for a user to take a DStream and
* generates a new DStream based on Gets and the results * generates a new DStream based on Gets and the results
* they bring back from HBase * they bring back from HBase
* *
* @param tableName The name of the table to get from * @param tableName The name of the table to get from
* @param batchSize The number of gets to be batched together * @param batchSize The number of gets to be batched together
* @param javaDStream Original DStream with data to iterate over * @param javaDStream Original DStream with data to iterate over
* @param makeGet Function to convert a value in the JavaDStream to a * @param makeGet Function to convert a value in the JavaDStream to a
* HBase Get * HBase Get
* @param convertResult This will convert the HBase Result object to * @param convertResult This will convert the HBase Result object to
* what ever the user wants to put in the resulting * what ever the user wants to put in the resulting
* JavaDStream * JavaDStream
* return new JavaDStream that is created by the Get to HBase * @return New JavaDStream that is created by the Get to HBase
*/ */
def streamBulkGet[T, U](tableName:TableName, def streamBulkGet[T, U](tableName: TableName,
batchSize:Integer, batchSize: Integer,
javaDStream: JavaDStream[T], javaDStream: JavaDStream[T],
makeGet: Function[T, Get], makeGet: Function[T, Get],
convertResult: Function[Result, U]) { convertResult: Function[Result, U]) {
JavaDStream.fromDStream(hbaseContext.streamBulkGet(tableName, JavaDStream.fromDStream(hbaseContext.streamBulkGet(tableName,
batchSize, batchSize,
javaDStream.dstream, javaDStream.dstream,
(t:T) => makeGet.call(t), (t: T) => makeGet.call(t),
(r:Result) => convertResult.call(r) )(fakeClassTag[U]))(fakeClassTag[U]) (r: Result) => convertResult.call(r))(fakeClassTag[U]))(fakeClassTag[U])
} }
/** /**
* This function will use the native HBase TableInputFormat with the * This function will use the native HBase TableInputFormat with the
* given scan object to generate a new JavaRDD * given scan object to generate a new JavaRDD
* *
* @param tableName the name of the table to scan * @param tableName The name of the table to scan
* @param scans the HBase scan object to use to read data from HBase * @param scans The HBase scan object to use to read data from HBase
* @param f function to convert a Result object from HBase into * @param f Function to convert a Result object from HBase into
* what the user wants in the final generated JavaRDD * What the user wants in the final generated JavaRDD
* @return new JavaRDD with results from scan * @return New JavaRDD with results from scan
*/ */
def hbaseRDD[U](tableName: TableName, def hbaseRDD[U](tableName: TableName,
scans: Scan, scans: Scan,
f: Function[(ImmutableBytesWritable, Result), U]): f: Function[(ImmutableBytesWritable, Result), U]):
JavaRDD[U] = { JavaRDD[U] = {
JavaRDD.fromRDD( JavaRDD.fromRDD(
hbaseContext.hbaseRDD[U](tableName, hbaseContext.hbaseRDD[U](tableName,
scans, scans,
(v:(ImmutableBytesWritable, Result)) => (v: (ImmutableBytesWritable, Result)) =>
f.call(v._1, v._2))(fakeClassTag[U]))(fakeClassTag[U]) f.call(v._1, v._2))(fakeClassTag[U]))(fakeClassTag[U])
} }
/** /**
* A overloaded version of HBaseContext hbaseRDD that define the * A overloaded version of HBaseContext hbaseRDD that define the
* type of the resulting JavaRDD * type of the resulting JavaRDD
* *
* @param tableName the name of the table to scan * @param tableName The name of the table to scan
* @param scans the HBase scan object to use to read data from HBase * @param scans The HBase scan object to use to read data from HBase
* @return New JavaRDD with results from scan * @return New JavaRDD with results from scan
* */
*/ def hbaseRDD(tableName: TableName,
def hbaseRDD(tableName: TableName, scans: Scan):
scans: Scan): JavaRDD[(ImmutableBytesWritable, Result)] = {
JavaRDD[(ImmutableBytesWritable, Result)] = { JavaRDD.fromRDD(hbaseContext.hbaseRDD(tableName, scans))
JavaRDD.fromRDD(hbaseContext.hbaseRDD(tableName, scans)) }
}
/** /**
* Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef]. * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
* *
* This method is used to keep ClassTags out of the external Java API, as the Java compiler * This method is used to keep ClassTags out of the external Java API, as the Java compiler
* cannot produce them automatically. While this ClassTag-faking does please the compiler, * cannot produce them automatically. While this ClassTag-faking does please the compiler,
* it can cause problems at runtime if the Scala API relies on ClassTags for correctness. * it can cause problems at runtime if the Scala API relies on ClassTags for correctness.
* *
* Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior, * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
* just worse performance or security issues. * just worse performance or security issues.
* For instance, an Array[AnyRef] can hold any type T, * For instance, an Array[AnyRef] can hold any type T,
* but may lose primitive * but may lose primitive
* specialization. * specialization.
*/ */
private[spark] private[spark]
def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
} }