HBASE-15036 Update HBase Spark documentation to include bulk load with thin records (Ted Malaska)

This commit is contained in:
Jonathan M Hsieh 2016-01-26 11:02:48 -08:00
parent 4ca27a68ef
commit cb539917a2
1 changed files with 79 additions and 12 deletions

View File

@ -210,19 +210,25 @@ to the HBase Connections in the executors
== Bulk Load
Spark bulk load follows very closely to the MapReduce implementation of bulk
load. In short, a partitioner partitions based on region splits and
the row keys are sent to the reducers in order, so that HFiles can be written
out. In Spark terms, the bulk load will be focused around a
`repartitionAndSortWithinPartitions` followed by a `foreachPartition`.
There are two options for bulk loading data into HBase with Spark. There is the
basic bulk load functionality that will work for cases where your rows have
millions of columns and cases where your columns are not consolidated and
partitions before the on the map side of the Spark bulk load process.
The only major difference with the Spark implementation compared to the
MapReduce implementation is that the column qualifier is included in the shuffle
ordering process. This was done because the MapReduce bulk load implementation
would have memory issues with loading rows with a large numbers of columns, as a
result of the sorting of those columns being done in the memory of the reducer JVM.
Instead, that ordering is done in the Spark Shuffle, so there should no longer
be a limit to the number of columns in a row for bulk loading.
There is also a thin record bulk load option with Spark, this second option is
designed for tables that have less then 10k columns per row. The advantage
of this second option is higher throughput and less over all load on the Spark
shuffle operation.
Both implementations work more or less like the MapReduce bulk load process in
that a partitioner partitions the rowkeys based on region splits and
the row keys are sent to the reducers in order, so that HFiles can be written
out directly from the reduce phase.
In Spark terms, the bulk load will be implemented around a the Spark
`repartitionAndSortWithinPartitions` followed by a Spark `foreachPartition`.
First lets look at an example of using the basic bulk load functionality
.Bulk Loading Example
====
@ -237,6 +243,11 @@ val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val stagingFolder = ...
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
(Bytes.toBytes("3"),
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
rdd.hbaseBulkLoad(TableName.valueOf(tableName),
t => {
@ -290,6 +301,11 @@ val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val stagingFolder = ...
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
(Bytes.toBytes("3"),
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")
@ -318,6 +334,57 @@ load.doBulkLoad(new Path(stagingFolder.getPath),
----
====
Now lets look at how you would call the thin record bulk load implementation
.Using thin record bulk load
====
[source, scala]
----
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val stagingFolder = ...
val rdd = sc.parallelize(Array(
("1",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
("3",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
rdd.hbaseBulkLoadThinRows(hbaseContext,
TableName.valueOf(tableName),
t => {
val rowKey = t._1
val familyQualifiersValues = new FamiliesQualifiersValues
t._2.foreach(f => {
val family:Array[Byte] = f._1
val qualifier = f._2
val value:Array[Byte] = f._3
familyQualifiersValues +=(family, qualifier, value)
})
(new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
},
stagingFolder.getPath,
new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude = false,
20)
val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
----
====
Note that the big difference in using bulk load for thin rows is the function
returns a tuple with the first value being the row key and the second value
being an object of FamiliesQualifiersValues, which will contain all the
values for this row for all column families.
== SparkSQL/DataFrames
http://spark.apache.org/sql/[SparkSQL] is a subproject of Spark that supports