From cb539917a2cb307060b0ea4330a52cd40fe7e150 Mon Sep 17 00:00:00 2001 From: Jonathan M Hsieh Date: Tue, 26 Jan 2016 11:02:48 -0800 Subject: [PATCH] HBASE-15036 Update HBase Spark documentation to include bulk load with thin records (Ted Malaska) --- src/main/asciidoc/_chapters/spark.adoc | 91 ++++++++++++++++++++++---- 1 file changed, 79 insertions(+), 12 deletions(-) diff --git a/src/main/asciidoc/_chapters/spark.adoc b/src/main/asciidoc/_chapters/spark.adoc index 37503e99392..b1bdb5daf08 100644 --- a/src/main/asciidoc/_chapters/spark.adoc +++ b/src/main/asciidoc/_chapters/spark.adoc @@ -210,19 +210,25 @@ to the HBase Connections in the executors == Bulk Load -Spark bulk load follows very closely to the MapReduce implementation of bulk -load. In short, a partitioner partitions based on region splits and -the row keys are sent to the reducers in order, so that HFiles can be written -out. In Spark terms, the bulk load will be focused around a -`repartitionAndSortWithinPartitions` followed by a `foreachPartition`. +There are two options for bulk loading data into HBase with Spark. There is the +basic bulk load functionality that will work for cases where your rows have +millions of columns and cases where your columns are not consolidated and +partitions before the on the map side of the Spark bulk load process. -The only major difference with the Spark implementation compared to the -MapReduce implementation is that the column qualifier is included in the shuffle -ordering process. This was done because the MapReduce bulk load implementation -would have memory issues with loading rows with a large numbers of columns, as a -result of the sorting of those columns being done in the memory of the reducer JVM. -Instead, that ordering is done in the Spark Shuffle, so there should no longer -be a limit to the number of columns in a row for bulk loading. +There is also a thin record bulk load option with Spark, this second option is +designed for tables that have less then 10k columns per row. The advantage +of this second option is higher throughput and less over all load on the Spark +shuffle operation. + +Both implementations work more or less like the MapReduce bulk load process in +that a partitioner partitions the rowkeys based on region splits and +the row keys are sent to the reducers in order, so that HFiles can be written +out directly from the reduce phase. + +In Spark terms, the bulk load will be implemented around a the Spark +`repartitionAndSortWithinPartitions` followed by a Spark `foreachPartition`. + +First lets look at an example of using the basic bulk load functionality .Bulk Loading Example ==== @@ -237,6 +243,11 @@ val config = new HBaseConfiguration() val hbaseContext = new HBaseContext(sc, config) val stagingFolder = ... +val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), + (Bytes.toBytes("3"), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ... rdd.hbaseBulkLoad(TableName.valueOf(tableName), t => { @@ -290,6 +301,11 @@ val config = new HBaseConfiguration() val hbaseContext = new HBaseContext(sc, config) val stagingFolder = ... +val rdd = sc.parallelize(Array( + (Bytes.toBytes("1"), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), + (Bytes.toBytes("3"), + (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ... val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions] val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX") @@ -318,6 +334,57 @@ load.doBulkLoad(new Path(stagingFolder.getPath), ---- ==== +Now lets look at how you would call the thin record bulk load implementation + +.Using thin record bulk load +==== + +[source, scala] +---- +val sc = new SparkContext("local", "test") +val config = new HBaseConfiguration() + +val hbaseContext = new HBaseContext(sc, config) + +val stagingFolder = ... +val rdd = sc.parallelize(Array( + ("1", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), + ("3", + (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ... + +rdd.hbaseBulkLoadThinRows(hbaseContext, + TableName.valueOf(tableName), + t => { + val rowKey = t._1 + + val familyQualifiersValues = new FamiliesQualifiersValues + t._2.foreach(f => { + val family:Array[Byte] = f._1 + val qualifier = f._2 + val value:Array[Byte] = f._3 + + familyQualifiersValues +=(family, qualifier, value) + }) + (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues) + }, + stagingFolder.getPath, + new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions], + compactionExclude = false, + 20) + +val load = new LoadIncrementalHFiles(config) +load.doBulkLoad(new Path(stagingFolder.getPath), + conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) +---- +==== + +Note that the big difference in using bulk load for thin rows is the function +returns a tuple with the first value being the row key and the second value +being an object of FamiliesQualifiersValues, which will contain all the +values for this row for all column families. + + == SparkSQL/DataFrames http://spark.apache.org/sql/[SparkSQL] is a subproject of Spark that supports