HBASE-14158 Add documentation for Initial Release for HBase-Spark Module integration
Signed-off-by: Misty Stanley-Jones <mstanleyjones@cloudera.com>
This commit is contained in:
parent
bfcec29dd1
commit
e7f659cac4
|
@ -0,0 +1,451 @@
|
||||||
|
////
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
. . http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
////
|
||||||
|
|
||||||
|
[[spark]]
|
||||||
|
= HBase and Spark
|
||||||
|
:doctype: book
|
||||||
|
:numbered:
|
||||||
|
:toc: left
|
||||||
|
:icons: font
|
||||||
|
:experimental:
|
||||||
|
|
||||||
|
link:http://spark.apache.org/[Apache Spark] is a software framework that is used
|
||||||
|
to process data in memory in a distributed manner, and is replacing MapReduce in
|
||||||
|
many use cases.
|
||||||
|
|
||||||
|
Spark itself is out of scope of this document, please refer to the Spark site for
|
||||||
|
more information on the Spark project and subprojects. This document will focus
|
||||||
|
on 4 main interaction points between Spark and HBase. Those interaction points are:
|
||||||
|
|
||||||
|
Basic Spark::
|
||||||
|
The ability to have a HBase Connection at any point in your Spark DAG.
|
||||||
|
Spark Streaming::
|
||||||
|
The ability to have a HBase Connection at any point in your Spark Streaming
|
||||||
|
application.
|
||||||
|
Spark Bulk Load::
|
||||||
|
The ability to write directly to HBase HFiles for bulk insertion into HBase
|
||||||
|
SparkSQL/DataFrames::
|
||||||
|
The ability to write SparkSQL that draws on tables that are represented in HBase.
|
||||||
|
|
||||||
|
The following sections will walk through examples of all these interaction points.
|
||||||
|
|
||||||
|
== Basic Spark
|
||||||
|
|
||||||
|
This section discusses Spark HBase integration at the lowest and simplest levels.
|
||||||
|
All the other interaction points are built upon the concepts that will be described
|
||||||
|
here.
|
||||||
|
|
||||||
|
At the root of all Spark and HBase integration is the HBaseContext. The HBaseContext
|
||||||
|
takes in HBase configurations and pushes them to the Spark executors. This allows
|
||||||
|
us to have an HBase Connection per Spark Executor in a static location.
|
||||||
|
|
||||||
|
For reference, Spark Executors can be on the same nodes as the Region Servers or
|
||||||
|
on different nodes there is no dependence of co-location. Think of every Spark
|
||||||
|
Executor as a multi-threaded client application. This allows any Spark Tasks
|
||||||
|
running on the executors to access the shared Connection object.
|
||||||
|
|
||||||
|
.HBaseContext Usage Example
|
||||||
|
====
|
||||||
|
|
||||||
|
This example shows how HBaseContext can be used to do a `foreachPartition` on a RDD
|
||||||
|
in Scala:
|
||||||
|
|
||||||
|
[source, scala]
|
||||||
|
----
|
||||||
|
val sc = new SparkContext("local", "test")
|
||||||
|
val config = new HBaseConfiguration()
|
||||||
|
|
||||||
|
...
|
||||||
|
|
||||||
|
val hbaseContext = new HBaseContext(sc, config)
|
||||||
|
|
||||||
|
rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
|
||||||
|
val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
|
||||||
|
it.foreach((putRecord) => {
|
||||||
|
. val put = new Put(putRecord._1)
|
||||||
|
. putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
|
||||||
|
. bufferedMutator.mutate(put)
|
||||||
|
})
|
||||||
|
bufferedMutator.flush()
|
||||||
|
bufferedMutator.close()
|
||||||
|
})
|
||||||
|
----
|
||||||
|
|
||||||
|
Here is the same example implemented in Java:
|
||||||
|
|
||||||
|
[source, java]
|
||||||
|
----
|
||||||
|
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
|
||||||
|
|
||||||
|
try {
|
||||||
|
List<byte[]> list = new ArrayList<>();
|
||||||
|
list.add(Bytes.toBytes("1"));
|
||||||
|
...
|
||||||
|
list.add(Bytes.toBytes("5"));
|
||||||
|
|
||||||
|
JavaRDD<byte[]> rdd = jsc.parallelize(list);
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
|
||||||
|
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
|
||||||
|
|
||||||
|
hbaseContext.foreachPartition(rdd,
|
||||||
|
new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
|
||||||
|
public void call(Tuple2<Iterator<byte[]>, Connection> t)
|
||||||
|
throws Exception {
|
||||||
|
Table table = t._2().getTable(TableName.valueOf(tableName));
|
||||||
|
BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
|
||||||
|
while (t._1().hasNext()) {
|
||||||
|
byte[] b = t._1().next();
|
||||||
|
Result r = table.get(new Get(b));
|
||||||
|
if (r.getExists()) {
|
||||||
|
mutator.mutate(new Put(b));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mutator.flush();
|
||||||
|
mutator.close();
|
||||||
|
table.close();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
jsc.stop();
|
||||||
|
}
|
||||||
|
----
|
||||||
|
====
|
||||||
|
|
||||||
|
All functionality between Spark and HBase will be supported both in Scala and in
|
||||||
|
Java, with the exception of SparkSQL which will support any language that is
|
||||||
|
supported by Spark. For the remaining of this documentation we will focus on
|
||||||
|
Scala examples for now.
|
||||||
|
|
||||||
|
The examples above illustrate how to do a foreachPartition with a connection. A
|
||||||
|
number of other Spark base functions are supported out of the box:
|
||||||
|
|
||||||
|
// tag::spark_base_functions[]
|
||||||
|
`bulkPut`:: For massively parallel sending of puts to HBase
|
||||||
|
`bulkDelete`:: For massively parallel sending of deletes to HBase
|
||||||
|
`bulkGet`:: For massively parallel sending of gets to HBase to create a new RDD
|
||||||
|
`mapPartition`:: To do a Spark Map function with a Connection object to allow full
|
||||||
|
access to HBase
|
||||||
|
`hBaseRDD`:: To simplify a distributed scan to create a RDD
|
||||||
|
// end::spark_base_functions[]
|
||||||
|
|
||||||
|
For examples of all these functionalities, see the HBase-Spark Module.
|
||||||
|
|
||||||
|
== Spark Streaming
|
||||||
|
http://spark.apache.org/streaming/[Spark Streaming] is a micro batching stream
|
||||||
|
processing framework built on top of Spark. HBase and Spark Streaming make great
|
||||||
|
companions in that HBase can help serve the following benefits alongside Spark
|
||||||
|
Streaming.
|
||||||
|
|
||||||
|
* A place to grab reference data or profile data on the fly
|
||||||
|
* A place to store counts or aggregates in a way that supports Spark Streaming
|
||||||
|
promise of _only once processing_.
|
||||||
|
|
||||||
|
The HBase-Spark module’s integration points with Spark Streaming are similar to
|
||||||
|
its normal Spark integration points, in that the following commands are possible
|
||||||
|
straight off a Spark Streaming DStream.
|
||||||
|
|
||||||
|
include::spark.adoc[tags=spark_base_functions]
|
||||||
|
|
||||||
|
.`bulkPut` Example with DStreams
|
||||||
|
====
|
||||||
|
|
||||||
|
Below is an example of bulkPut with DStreams. It is very close in feel to the RDD
|
||||||
|
bulk put.
|
||||||
|
|
||||||
|
[source, scala]
|
||||||
|
----
|
||||||
|
val sc = new SparkContext("local", "test")
|
||||||
|
val config = new HBaseConfiguration()
|
||||||
|
|
||||||
|
val hbaseContext = new HBaseContext(sc, config)
|
||||||
|
val ssc = new StreamingContext(sc, Milliseconds(200))
|
||||||
|
|
||||||
|
val rdd1 = ...
|
||||||
|
val rdd2 = ...
|
||||||
|
|
||||||
|
val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte],
|
||||||
|
Array[Byte], Array[Byte])])]]()
|
||||||
|
|
||||||
|
queue += rdd1
|
||||||
|
queue += rdd2
|
||||||
|
|
||||||
|
val dStream = ssc.queueStream(queue)
|
||||||
|
|
||||||
|
dStream.hbaseBulkPut(
|
||||||
|
hbaseContext,
|
||||||
|
TableName.valueOf(tableName),
|
||||||
|
(putRecord) => {
|
||||||
|
val put = new Put(putRecord._1)
|
||||||
|
putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
|
||||||
|
put
|
||||||
|
})
|
||||||
|
----
|
||||||
|
|
||||||
|
There are three inputs to the `hbaseBulkPut` function.
|
||||||
|
. The hbaseContext that carries the configuration boardcast information link us
|
||||||
|
to the HBase Connections in the executors
|
||||||
|
. The table name of the table we are putting data into
|
||||||
|
. A function that will convert a record in the DStream into a HBase Put object.
|
||||||
|
====
|
||||||
|
|
||||||
|
== Bulk Load
|
||||||
|
|
||||||
|
Spark bulk load follows very closely to the MapReduce implementation of bulk
|
||||||
|
load. In short, a partitioner partitions based on region splits and
|
||||||
|
the row keys are sent to the reducers in order, so that HFiles can be written
|
||||||
|
out. In Spark terms, the bulk load will be focused around a
|
||||||
|
`repartitionAndSortWithinPartitions` followed by a `foreachPartition`.
|
||||||
|
|
||||||
|
The only major difference with the Spark implementation compared to the
|
||||||
|
MapReduce implementation is that the column qualifier is included in the shuffle
|
||||||
|
ordering process. This was done because the MapReduce bulk load implementation
|
||||||
|
would have memory issues with loading rows with a large numbers of columns, as a
|
||||||
|
result of the sorting of those columns being done in the memory of the reducer JVM.
|
||||||
|
Instead, that ordering is done in the Spark Shuffle, so there should no longer
|
||||||
|
be a limit to the number of columns in a row for bulk loading.
|
||||||
|
|
||||||
|
.Bulk Loading Example
|
||||||
|
====
|
||||||
|
|
||||||
|
The following example shows bulk loading with Spark.
|
||||||
|
|
||||||
|
[source, scala]
|
||||||
|
----
|
||||||
|
val sc = new SparkContext("local", "test")
|
||||||
|
val config = new HBaseConfiguration()
|
||||||
|
|
||||||
|
val hbaseContext = new HBaseContext(sc, config)
|
||||||
|
|
||||||
|
val stagingFolder = ...
|
||||||
|
|
||||||
|
rdd.hbaseBulkLoad(TableName.valueOf(tableName),
|
||||||
|
t => {
|
||||||
|
val rowKey = t._1
|
||||||
|
val family:Array[Byte] = t._2(0)._1
|
||||||
|
val qualifier = t._2(0)._2
|
||||||
|
val value = t._2(0)._3
|
||||||
|
|
||||||
|
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
|
||||||
|
|
||||||
|
Seq((keyFamilyQualifier, value)).iterator
|
||||||
|
},
|
||||||
|
stagingFolder.getPath)
|
||||||
|
|
||||||
|
val load = new LoadIncrementalHFiles(config)
|
||||||
|
load.doBulkLoad(new Path(stagingFolder.getPath),
|
||||||
|
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
|
||||||
|
----
|
||||||
|
====
|
||||||
|
|
||||||
|
The `hbaseBulkLoad` function takes three required parameters:
|
||||||
|
|
||||||
|
. The table name of the table we intend to bulk load too
|
||||||
|
|
||||||
|
. A function that will convert a record in the RDD to a tuple key value par. With
|
||||||
|
the tuple key being a KeyFamilyQualifer object and the value being the cell value.
|
||||||
|
The KeyFamilyQualifer object will hold the RowKey, Column Family, and Column Qualifier.
|
||||||
|
The shuffle will partition on the RowKey but will sort by all three values.
|
||||||
|
|
||||||
|
. The temporary path for the HFile to be written out too
|
||||||
|
|
||||||
|
Following the Spark bulk load command, use the HBase's LoadIncrementalHFiles object
|
||||||
|
to load the newly created HFiles into HBase.
|
||||||
|
|
||||||
|
.Additional Parameters for Bulk Loading with Spark
|
||||||
|
|
||||||
|
You can set the following attributes with additional parameter options on hbaseBulkLoad.
|
||||||
|
|
||||||
|
* Max file size of the HFiles
|
||||||
|
* A flag to exclude HFiles from compactions
|
||||||
|
* Column Family settings for compression, bloomType, blockSize, and dataBlockEncoding
|
||||||
|
|
||||||
|
.Using Additional Parameters
|
||||||
|
====
|
||||||
|
|
||||||
|
[source, scala]
|
||||||
|
----
|
||||||
|
val sc = new SparkContext("local", "test")
|
||||||
|
val config = new HBaseConfiguration()
|
||||||
|
|
||||||
|
val hbaseContext = new HBaseContext(sc, config)
|
||||||
|
|
||||||
|
val stagingFolder = ...
|
||||||
|
|
||||||
|
val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
|
||||||
|
val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")
|
||||||
|
|
||||||
|
familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options)
|
||||||
|
|
||||||
|
rdd.hbaseBulkLoad(TableName.valueOf(tableName),
|
||||||
|
t => {
|
||||||
|
val rowKey = t._1
|
||||||
|
val family:Array[Byte] = t._2(0)._1
|
||||||
|
val qualifier = t._2(0)._2
|
||||||
|
val value = t._2(0)._3
|
||||||
|
|
||||||
|
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
|
||||||
|
|
||||||
|
Seq((keyFamilyQualifier, value)).iterator
|
||||||
|
},
|
||||||
|
stagingFolder.getPath,
|
||||||
|
familyHBaseWriterOptions,
|
||||||
|
compactionExclude = false,
|
||||||
|
HConstants.DEFAULT_MAX_FILE_SIZE)
|
||||||
|
|
||||||
|
val load = new LoadIncrementalHFiles(config)
|
||||||
|
load.doBulkLoad(new Path(stagingFolder.getPath),
|
||||||
|
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
|
||||||
|
----
|
||||||
|
====
|
||||||
|
|
||||||
|
== SparkSQL/DataFrames
|
||||||
|
|
||||||
|
http://spark.apache.org/sql/[SparkSQL] is a subproject of Spark that supports
|
||||||
|
SQL that will compute down to a Spark DAG. In addition,SparkSQL is a heavy user
|
||||||
|
of DataFrames. DataFrames are like RDDs with schema information.
|
||||||
|
|
||||||
|
The HBase-Spark module includes support for Spark SQL and DataFrames, which allows
|
||||||
|
you to write SparkSQL directly on HBase tables. In addition the HBase-Spark
|
||||||
|
will push down query filtering logic to HBase.
|
||||||
|
|
||||||
|
=== Predicate Push Down
|
||||||
|
|
||||||
|
There are two examples of predicate push down in the HBase-Spark implementation.
|
||||||
|
The first example shows the push down of filtering logic on the RowKey. HBase-Spark
|
||||||
|
will reduce the filters on RowKeys down to a set of Get and/or Scan commands.
|
||||||
|
|
||||||
|
NOTE: The Scans are distributed scans, rather than a single client scan operation.
|
||||||
|
|
||||||
|
If the query looks something like the following, the logic will push down and get
|
||||||
|
the rows through 3 Gets and 0 Scans. We can do gets because all the operations
|
||||||
|
are `equal` operations.
|
||||||
|
|
||||||
|
[source,sql]
|
||||||
|
----
|
||||||
|
SELECT
|
||||||
|
KEY_FIELD,
|
||||||
|
B_FIELD,
|
||||||
|
A_FIELD
|
||||||
|
FROM hbaseTmp
|
||||||
|
WHERE (KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3')
|
||||||
|
----
|
||||||
|
|
||||||
|
Now lets look at an example where we will end up doing two scans on HBase.
|
||||||
|
|
||||||
|
[source, sql]
|
||||||
|
----
|
||||||
|
SELECT
|
||||||
|
KEY_FIELD,
|
||||||
|
B_FIELD,
|
||||||
|
A_FIELD
|
||||||
|
FROM hbaseTmp
|
||||||
|
WHERE KEY_FIELD < 'get2' or KEY_FIELD > 'get3'
|
||||||
|
----
|
||||||
|
|
||||||
|
In this example we will get 0 Gets and 2 Scans. One scan will load everything
|
||||||
|
from the first row in the table until “get2” and the second scan will get
|
||||||
|
everything from “get3” until the last row in the table.
|
||||||
|
|
||||||
|
The next query is a good example of having a good deal of range checks. However
|
||||||
|
the ranges overlap. To the code will be smart enough to get the following data
|
||||||
|
in a single scan that encompasses all the data asked by the query.
|
||||||
|
|
||||||
|
[source, sql]
|
||||||
|
----
|
||||||
|
SELECT
|
||||||
|
KEY_FIELD,
|
||||||
|
B_FIELD,
|
||||||
|
A_FIELD
|
||||||
|
FROM hbaseTmp
|
||||||
|
WHERE
|
||||||
|
(KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or
|
||||||
|
(KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')
|
||||||
|
----
|
||||||
|
|
||||||
|
The second example of push down functionality offered by the HBase-Spark module
|
||||||
|
is the ability to push down filter logic for column and cell fields. Just like
|
||||||
|
the RowKey logic, all query logic will be consolidated into the minimum number
|
||||||
|
of range checks and equal checks by sending a Filter object along with the Scan
|
||||||
|
with information about consolidated push down predicates
|
||||||
|
|
||||||
|
.SparkSQL Code Example
|
||||||
|
====
|
||||||
|
This example shows how we can interact with HBase with SQL.
|
||||||
|
|
||||||
|
[source, scala]
|
||||||
|
----
|
||||||
|
val sc = new SparkContext("local", "test")
|
||||||
|
val config = new HBaseConfiguration()
|
||||||
|
|
||||||
|
new HBaseContext(sc, TEST_UTIL.getConfiguration)
|
||||||
|
val sqlContext = new SQLContext(sc)
|
||||||
|
|
||||||
|
df = sqlContext.load("org.apache.hadoop.hbase.spark",
|
||||||
|
Map("hbase.columns.mapping" ->
|
||||||
|
"KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b",
|
||||||
|
"hbase.table" -> "t1"))
|
||||||
|
|
||||||
|
df.registerTempTable("hbaseTmp")
|
||||||
|
|
||||||
|
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD FROM hbaseTmp " +
|
||||||
|
"WHERE " +
|
||||||
|
"(KEY_FIELD = 'get1' and B_FIELD < '3') or " +
|
||||||
|
"(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5)
|
||||||
|
----
|
||||||
|
|
||||||
|
There are three major parts of this example that deserve explaining.
|
||||||
|
|
||||||
|
The sqlContext.load function::
|
||||||
|
In the sqlContext.load function we see two
|
||||||
|
parameters. The first of these parameters is pointing Spark to the HBase
|
||||||
|
DefaultSource class that will act as the interface between SparkSQL and HBase.
|
||||||
|
|
||||||
|
A map of key value pairs::
|
||||||
|
In this example we have two keys in our map, `hbase.columns.mapping` and
|
||||||
|
`hbase.table`. The `hbase.table` directs SparkSQL to use the given HBase table.
|
||||||
|
The `hbase.columns.mapping` key give us the logic to translate HBase columns to
|
||||||
|
SparkSQL columns.
|
||||||
|
+
|
||||||
|
The `hbase.columns.mapping` is a string that follows the following format
|
||||||
|
+
|
||||||
|
[source, scala]
|
||||||
|
----
|
||||||
|
(SparkSQL.ColumnName) (SparkSQL.ColumnType) (HBase.ColumnFamily):(HBase.Qualifier)
|
||||||
|
----
|
||||||
|
+
|
||||||
|
In the example below we see the definition of three fields. Because KEY_FIELD has
|
||||||
|
no ColumnFamily, it is the RowKey.
|
||||||
|
+
|
||||||
|
----
|
||||||
|
KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b
|
||||||
|
----
|
||||||
|
|
||||||
|
The registerTempTable function::
|
||||||
|
This is a SparkSQL function that allows us now to be free of Scala when accessing
|
||||||
|
our HBase table directly with SQL with the table name of "hbaseTmp".
|
||||||
|
|
||||||
|
The last major point to note in the example is the `sqlContext.sql` function, which
|
||||||
|
allows the user to ask their questions in SQL which will be pushed down to the
|
||||||
|
DefaultSource code in the HBase-Spark module. The result of this command will be
|
||||||
|
a DataFrame with the Schema of KEY_FIELD and B_FIELD.
|
||||||
|
====
|
Loading…
Reference in New Issue