From 86f37686278fa211e349da9e544eed5e1d3288c5 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Thu, 2 Jun 2016 16:17:33 -0700 Subject: [PATCH] HBASE-15473: Documentation for the usage of hbase dataframe user api (JSON, Avro, etc) --- src/main/asciidoc/_chapters/spark.adoc | 390 +++++++++++++++++-------- 1 file changed, 263 insertions(+), 127 deletions(-) diff --git a/src/main/asciidoc/_chapters/spark.adoc b/src/main/asciidoc/_chapters/spark.adoc index 88918aa366f..774d137fc34 100644 --- a/src/main/asciidoc/_chapters/spark.adoc +++ b/src/main/asciidoc/_chapters/spark.adoc @@ -384,29 +384,157 @@ returns a tuple with the first value being the row key and the second value being an object of FamiliesQualifiersValues, which will contain all the values for this row for all column families. - == SparkSQL/DataFrames -http://spark.apache.org/sql/[SparkSQL] is a subproject of Spark that supports -SQL that will compute down to a Spark DAG. In addition,SparkSQL is a heavy user -of DataFrames. DataFrames are like RDDs with schema information. +HBase-Spark Connector (in HBase-Spark Module) leverages +link:https://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html[DataSource API] +(link:https://issues.apache.org/jira/browse/SPARK-3247[SPARK-3247]) +introduced in Spark-1.2.0, bridges the gap between simple HBase KV store and complex +relational SQL queries and enables users to perform complex data analytical work +on top of HBase using Spark. HBase Dataframe is a standard Spark Dataframe, and is able to +interact with any other data sources such as Hive, Orc, Parquet, JSON, etc. +HBase-Spark Connector applies critical techniques such as partition pruning, column pruning, +predicate pushdown and data locality. -The HBase-Spark module includes support for Spark SQL and DataFrames, which allows -you to write SparkSQL directly on HBase tables. In addition the HBase-Spark -will push down query filtering logic to HBase. +To use HBase-Spark connector, users need to define the Catalog for the schema mapping +between HBase and Spark tables, prepare the data and populate the HBase table, +then load HBase DataFrame. After that, users can do integrated query and access records +in HBase table with SQL query. Following illustrates the basic procedure. -In HBaseSparkConf, four parameters related to timestamp can be set. They are TIMESTAMP, -MIN_TIMESTAMP, MAX_TIMESTAMP and MAX_VERSIONS respectively. Users can query records -with different timestamps or time ranges with MIN_TIMESTAMP and MAX_TIMESTAMP. -In the meantime, use concrete value instead of tsSpecified and oldMs in the examples below. +=== Define catalog + +[source, scala] +---- +def catalog = s"""{ +       |"table":{"namespace":"default", "name":"table1"}, +       |"rowkey":"key", +       |"columns":{ +         |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, +         |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, +         |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, +         |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, +         |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, +         |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, +         |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, +         |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, +         |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} +       |} +     |}""".stripMargin +---- + +Catalog defines a mapping between HBase and Spark tables. There are two critical parts of this catalog. +One is the rowkey definition and the other is the mapping between table column in Spark and +the column family and column qualifier in HBase. The above defines a schema for a HBase table +with name as table1, row key as key and a number of columns (col1 `-` col8). Note that the rowkey +also has to be defined in details as a column (col0), which has a specific cf (rowkey). + +=== Save the DataFrame + +[source, scala] +---- +case class HBaseRecord( + col0: String, + col1: Boolean, + col2: Double, + col3: Float, + col4: Int,        + col5: Long, + col6: Short, + col7: String, + col8: Byte) + +object HBaseRecord +{                                                                                                              + def apply(i: Int, t: String): HBaseRecord = { + val s = s"""row${"%03d".format(i)}"""        + HBaseRecord(s, + i % 2 == 0, + i.toDouble, + i.toFloat,   + i, + i.toLong, + i.toShort,   + s"String$i: $t",       + i.toByte) + } +} + +val data = (0 to 255).map { i =>  HBaseRecord(i, "extra")} + +sc.parallelize(data).toDF.write.options( + Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")) + .format("org.apache.hadoop.hbase.spark ") + .save() + +---- +`data` prepared by the user is a local Scala collection which has 256 HBaseRecord objects. +`sc.parallelize(data)` function distributes `data` to form an RDD. `toDF` returns a DataFrame. +`write` function returns a DataFrameWriter used to write the DataFrame to external storage +systems (e.g. HBase here). Given a DataFrame with specified schema `catalog`, `save` function +will create an HBase table with 5 regions and save the DataFrame inside. + +=== Load the DataFrame + +[source, scala] +---- +def withCatalog(cat: String): DataFrame = { + sqlContext + .read + .options(Map(HBaseTableCatalog.tableCatalog->cat)) + .format("org.apache.hadoop.hbase.spark") + .load() +} +val df = withCatalog(catalog) +---- +In ‘withCatalog’ function, sqlContext is a variable of SQLContext, which is the entry point +for working with structured data (rows and columns) in Spark. +`read` returns a DataFrameReader that can be used to read data in as a DataFrame. +`option` function adds input options for the underlying data source to the DataFrameReader, +and `format` function specifies the input data source format for the DataFrameReader. +The `load()` function loads input in as a DataFrame. The date frame `df` returned +by `withCatalog` function could be used to access HBase table, such as 4.4 and 4.5. + +=== Language Integrated Query + +[source, scala] +---- +val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") || + $"col0" === "row005" || + $"col0" <= "row005") + .select("col0", "col1", "col4") +s.show +---- +DataFrame can do various operations, such as join, sort, select, filter, orderBy and so on. +`df.filter` above filters rows using the given SQL expression. `select` selects a set of columns: +`col0`, `col1` and `col4`. + +=== SQL Query + +[source, scala] +---- +df.registerTempTable("table1") +sqlContext.sql("select count(col1) from table1").show +---- + +`registerTempTable` registers `df` DataFrame as a temporary table using the table name `table1`. +The lifetime of this temporary table is tied to the SQLContext that was used to create `df`. +`sqlContext.sql` function allows the user to execute SQL queries. + +=== Others .Query with different timestamps ==== +In HBaseSparkConf, four parameters related to timestamp can be set. They are TIMESTAMP, +MIN_TIMESTAMP, MAX_TIMESTAMP and MAX_VERSIONS respectively. Users can query records with +different timestamps or time ranges with MIN_TIMESTAMP and MAX_TIMESTAMP. In the meantime, +use concrete value instead of tsSpecified and oldMs in the examples below. The example below shows how to load df DataFrame with different timestamps. tsSpecified is specified by the user. HBaseTableCatalog defines the HBase and Relation relation schema. writeCatalog defines catalog for the schema mapping. + +[source, scala] ---- val df = sqlContext.read .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> tsSpecified.toString)) @@ -416,6 +544,8 @@ val df = sqlContext.read The example below shows how to load df DataFrame with different time ranges. oldMs is specified by the user. + +[source, scala] ---- val df = sqlContext.read .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0", @@ -423,132 +553,138 @@ val df = sqlContext.read .format("org.apache.hadoop.hbase.spark") .load() ---- - After loading df DataFrame, users can query data. ----- - df.registerTempTable("table") - sqlContext.sql("select count(col1) from table").show ----- -==== - -=== Predicate Push Down - -There are two examples of predicate push down in the HBase-Spark implementation. -The first example shows the push down of filtering logic on the RowKey. HBase-Spark -will reduce the filters on RowKeys down to a set of Get and/or Scan commands. - -NOTE: The Scans are distributed scans, rather than a single client scan operation. - -If the query looks something like the following, the logic will push down and get -the rows through 3 Gets and 0 Scans. We can do gets because all the operations -are `equal` operations. - -[source,sql] ----- -SELECT - KEY_FIELD, - B_FIELD, - A_FIELD -FROM hbaseTmp -WHERE (KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3') ----- - -Now let's look at an example where we will end up doing two scans on HBase. - -[source, sql] ----- -SELECT - KEY_FIELD, - B_FIELD, - A_FIELD -FROM hbaseTmp -WHERE KEY_FIELD < 'get2' or KEY_FIELD > 'get3' ----- - -In this example we will get 0 Gets and 2 Scans. One scan will load everything -from the first row in the table until “get2” and the second scan will get -everything from “get3” until the last row in the table. - -The next query is a good example of having a good deal of range checks. However -the ranges overlap. To the code will be smart enough to get the following data -in a single scan that encompasses all the data asked by the query. - -[source, sql] ----- -SELECT - KEY_FIELD, - B_FIELD, - A_FIELD -FROM hbaseTmp -WHERE - (KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or - (KEY_FIELD > 'get3' and KEY_FIELD <= 'get5') ----- - -The second example of push down functionality offered by the HBase-Spark module -is the ability to push down filter logic for column and cell fields. Just like -the RowKey logic, all query logic will be consolidated into the minimum number -of range checks and equal checks by sending a Filter object along with the Scan -with information about consolidated push down predicates - -.SparkSQL Code Example -==== -This example shows how we can interact with HBase with SQL. [source, scala] ---- -val sc = new SparkContext("local", "test") -val config = new HBaseConfiguration() - -new HBaseContext(sc, TEST_UTIL.getConfiguration) -val sqlContext = new SQLContext(sc) - -df = sqlContext.load("org.apache.hadoop.hbase.spark", - Map("hbase.columns.mapping" -> - "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b", - "hbase.table" -> "t1")) - -df.registerTempTable("hbaseTmp") - -val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD FROM hbaseTmp " + - "WHERE " + - "(KEY_FIELD = 'get1' and B_FIELD < '3') or " + - "(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5) +df.registerTempTable("table") +sqlContext.sql("select count(col1) from table").show ---- +==== -There are three major parts of this example that deserve explaining. +.Native Avro support +==== +HBase-Spark Connector support different data formats like Avro, Jason, etc. The use case below +shows how spark supports Avro. User can persist the Avro record into HBase directly. Internally, +the Avro schema is converted to a native Spark Catalyst data type automatically. +Note that both key-value parts in an HBase table can be defined in Avro format. -The sqlContext.load function:: - In the sqlContext.load function we see two - parameters. The first of these parameters is pointing Spark to the HBase - DefaultSource class that will act as the interface between SparkSQL and HBase. +1) Define catalog for the schema mapping: -A map of key value pairs:: - In this example we have two keys in our map, `hbase.columns.mapping` and - `hbase.table`. The `hbase.table` directs SparkSQL to use the given HBase table. - The `hbase.columns.mapping` key give us the logic to translate HBase columns to - SparkSQL columns. -+ -The `hbase.columns.mapping` is a string that follows the following format -+ [source, scala] ---- -(SparkSQL.ColumnName) (SparkSQL.ColumnType) (HBase.ColumnFamily):(HBase.Qualifier) ----- -+ -In the example below we see the definition of three fields. Because KEY_FIELD has -no ColumnFamily, it is the RowKey. -+ ----- -KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b +def catalog = s"""{ + |"table":{"namespace":"default", "name":"Avrotable"}, + |"rowkey":"key", + |"columns":{ + |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, + |"col1":{"cf":"cf1", "col":"col1", "type":"binary"} + |} + |}""".stripMargin ---- -The registerTempTable function:: - This is a SparkSQL function that allows us now to be free of Scala when accessing - our HBase table directly with SQL with the table name of "hbaseTmp". +`catalog` is a schema for a HBase table named `Avrotable`. row key as key and +one column col1. The rowkey also has to be defined in details as a column (col0), +which has a specific cf (rowkey). -The last major point to note in the example is the `sqlContext.sql` function, which -allows the user to ask their questions in SQL which will be pushed down to the -DefaultSource code in the HBase-Spark module. The result of this command will be -a DataFrame with the Schema of KEY_FIELD and B_FIELD. -==== +2) Prepare the Data: + +[source, scala] +---- + object AvroHBaseRecord { + val schemaString = + s"""{"namespace": "example.avro", + | "type": "record", "name": "User", + | "fields": [ + | {"name": "name", "type": "string"}, + | {"name": "favorite_number", "type": ["int", "null"]}, + | {"name": "favorite_color", "type": ["string", "null"]}, + | {"name": "favorite_array", "type": {"type": "array", "items": "string"}}, + | {"name": "favorite_map", "type": {"type": "map", "values": "int"}} + | ] }""".stripMargin + + val avroSchema: Schema = { + val p = new Schema.Parser + p.parse(schemaString) + } + + def apply(i: Int): AvroHBaseRecord = { + val user = new GenericData.Record(avroSchema); + user.put("name", s"name${"%03d".format(i)}") + user.put("favorite_number", i) + user.put("favorite_color", s"color${"%03d".format(i)}") + val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema()) + favoriteArray.add(s"number${i}") + favoriteArray.add(s"number${i+1}") + user.put("favorite_array", favoriteArray) + import collection.JavaConverters._ + val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava + user.put("favorite_map", favoriteMap) + val avroByte = AvroSedes.serialize(user, avroSchema) + AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte) + } + } + + val data = (0 to 255).map { i => + AvroHBaseRecord(i) + } +---- + +`schemaString` is defined first, then it is parsed to get `avroSchema`. `avroSchema` is used to +generate `AvroHBaseRecord`. `data` prepared by users is a local Scala collection +which has 256 `AvroHBaseRecord` objects. + +3) Save DataFrame: + +[source, scala] +---- + sc.parallelize(data).toDF.write.options( + Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")) + .format("org.apache.spark.sql.execution.datasources.hbase") + .save() +---- + +Given a data frame with specified schema `catalog`, above will create an HBase table with 5 +regions and save the data frame inside. + +4) Load the DataFrame + +[source, scala] +---- +def avroCatalog = s"""{ + |"table":{"namespace":"default", "name":"avrotable"}, + |"rowkey":"key", + |"columns":{ + |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, + |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"} + |} + |}""".stripMargin + + def withCatalog(cat: String): DataFrame = { + sqlContext + .read + .options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog)) + .format("org.apache.spark.sql.execution.datasources.hbase") + .load() + } + val df = withCatalog(catalog) +---- + +In `withCatalog` function, `read` returns a DataFrameReader that can be used to read data in as a DataFrame. +The `option` function adds input options for the underlying data source to the DataFrameReader. +There are two options: one is to set `avroSchema` as `AvroHBaseRecord.schemaString`, and one is to +set `HBaseTableCatalog.tableCatalog` as `avroCatalog`. The `load()` function loads input in as a DataFrame. +The date frame `df` returned by `withCatalog` function could be used to access the HBase table. + +5) SQL Query + +[source, scala] +---- + df.registerTempTable("avrotable") + val c = sqlContext.sql("select count(1) from avrotable"). +---- + +After loading df DataFrame, users can query data. registerTempTable registers df DataFrame +as a temporary table using the table name avrotable. `sqlContext.sql` function allows the +user to execute SQL queries. +==== \ No newline at end of file