HBASE-15473: Documentation for the usage of hbase dataframe user api (JSON, Avro, etc)
This commit is contained in:
parent
28802decc8
commit
86f3768627
|
@ -384,29 +384,157 @@ returns a tuple with the first value being the row key and the second value
|
|||
being an object of FamiliesQualifiersValues, which will contain all the
|
||||
values for this row for all column families.
|
||||
|
||||
|
||||
== SparkSQL/DataFrames
|
||||
|
||||
http://spark.apache.org/sql/[SparkSQL] is a subproject of Spark that supports
|
||||
SQL that will compute down to a Spark DAG. In addition,SparkSQL is a heavy user
|
||||
of DataFrames. DataFrames are like RDDs with schema information.
|
||||
HBase-Spark Connector (in HBase-Spark Module) leverages
|
||||
link:https://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html[DataSource API]
|
||||
(link:https://issues.apache.org/jira/browse/SPARK-3247[SPARK-3247])
|
||||
introduced in Spark-1.2.0, bridges the gap between simple HBase KV store and complex
|
||||
relational SQL queries and enables users to perform complex data analytical work
|
||||
on top of HBase using Spark. HBase Dataframe is a standard Spark Dataframe, and is able to
|
||||
interact with any other data sources such as Hive, Orc, Parquet, JSON, etc.
|
||||
HBase-Spark Connector applies critical techniques such as partition pruning, column pruning,
|
||||
predicate pushdown and data locality.
|
||||
|
||||
The HBase-Spark module includes support for Spark SQL and DataFrames, which allows
|
||||
you to write SparkSQL directly on HBase tables. In addition the HBase-Spark
|
||||
will push down query filtering logic to HBase.
|
||||
To use HBase-Spark connector, users need to define the Catalog for the schema mapping
|
||||
between HBase and Spark tables, prepare the data and populate the HBase table,
|
||||
then load HBase DataFrame. After that, users can do integrated query and access records
|
||||
in HBase table with SQL query. Following illustrates the basic procedure.
|
||||
|
||||
In HBaseSparkConf, four parameters related to timestamp can be set. They are TIMESTAMP,
|
||||
MIN_TIMESTAMP, MAX_TIMESTAMP and MAX_VERSIONS respectively. Users can query records
|
||||
with different timestamps or time ranges with MIN_TIMESTAMP and MAX_TIMESTAMP.
|
||||
In the meantime, use concrete value instead of tsSpecified and oldMs in the examples below.
|
||||
=== Define catalog
|
||||
|
||||
[source, scala]
|
||||
----
|
||||
def catalog = s"""{
|
||||
|"table":{"namespace":"default", "name":"table1"},
|
||||
|"rowkey":"key",
|
||||
|"columns":{
|
||||
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|
||||
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|
||||
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|
||||
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|
||||
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|
||||
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|
||||
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|
||||
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|
||||
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|
||||
|}
|
||||
|}""".stripMargin
|
||||
----
|
||||
|
||||
Catalog defines a mapping between HBase and Spark tables. There are two critical parts of this catalog.
|
||||
One is the rowkey definition and the other is the mapping between table column in Spark and
|
||||
the column family and column qualifier in HBase. The above defines a schema for a HBase table
|
||||
with name as table1, row key as key and a number of columns (col1 `-` col8). Note that the rowkey
|
||||
also has to be defined in details as a column (col0), which has a specific cf (rowkey).
|
||||
|
||||
=== Save the DataFrame
|
||||
|
||||
[source, scala]
|
||||
----
|
||||
case class HBaseRecord(
|
||||
col0: String,
|
||||
col1: Boolean,
|
||||
col2: Double,
|
||||
col3: Float,
|
||||
col4: Int,
|
||||
col5: Long,
|
||||
col6: Short,
|
||||
col7: String,
|
||||
col8: Byte)
|
||||
|
||||
object HBaseRecord
|
||||
{
|
||||
def apply(i: Int, t: String): HBaseRecord = {
|
||||
val s = s"""row${"%03d".format(i)}"""
|
||||
HBaseRecord(s,
|
||||
i % 2 == 0,
|
||||
i.toDouble,
|
||||
i.toFloat,
|
||||
i,
|
||||
i.toLong,
|
||||
i.toShort,
|
||||
s"String$i: $t",
|
||||
i.toByte)
|
||||
}
|
||||
}
|
||||
|
||||
val data = (0 to 255).map { i => HBaseRecord(i, "extra")}
|
||||
|
||||
sc.parallelize(data).toDF.write.options(
|
||||
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
|
||||
.format("org.apache.hadoop.hbase.spark ")
|
||||
.save()
|
||||
|
||||
----
|
||||
`data` prepared by the user is a local Scala collection which has 256 HBaseRecord objects.
|
||||
`sc.parallelize(data)` function distributes `data` to form an RDD. `toDF` returns a DataFrame.
|
||||
`write` function returns a DataFrameWriter used to write the DataFrame to external storage
|
||||
systems (e.g. HBase here). Given a DataFrame with specified schema `catalog`, `save` function
|
||||
will create an HBase table with 5 regions and save the DataFrame inside.
|
||||
|
||||
=== Load the DataFrame
|
||||
|
||||
[source, scala]
|
||||
----
|
||||
def withCatalog(cat: String): DataFrame = {
|
||||
sqlContext
|
||||
.read
|
||||
.options(Map(HBaseTableCatalog.tableCatalog->cat))
|
||||
.format("org.apache.hadoop.hbase.spark")
|
||||
.load()
|
||||
}
|
||||
val df = withCatalog(catalog)
|
||||
----
|
||||
In ‘withCatalog’ function, sqlContext is a variable of SQLContext, which is the entry point
|
||||
for working with structured data (rows and columns) in Spark.
|
||||
`read` returns a DataFrameReader that can be used to read data in as a DataFrame.
|
||||
`option` function adds input options for the underlying data source to the DataFrameReader,
|
||||
and `format` function specifies the input data source format for the DataFrameReader.
|
||||
The `load()` function loads input in as a DataFrame. The date frame `df` returned
|
||||
by `withCatalog` function could be used to access HBase table, such as 4.4 and 4.5.
|
||||
|
||||
=== Language Integrated Query
|
||||
|
||||
[source, scala]
|
||||
----
|
||||
val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") ||
|
||||
$"col0" === "row005" ||
|
||||
$"col0" <= "row005")
|
||||
.select("col0", "col1", "col4")
|
||||
s.show
|
||||
----
|
||||
DataFrame can do various operations, such as join, sort, select, filter, orderBy and so on.
|
||||
`df.filter` above filters rows using the given SQL expression. `select` selects a set of columns:
|
||||
`col0`, `col1` and `col4`.
|
||||
|
||||
=== SQL Query
|
||||
|
||||
[source, scala]
|
||||
----
|
||||
df.registerTempTable("table1")
|
||||
sqlContext.sql("select count(col1) from table1").show
|
||||
----
|
||||
|
||||
`registerTempTable` registers `df` DataFrame as a temporary table using the table name `table1`.
|
||||
The lifetime of this temporary table is tied to the SQLContext that was used to create `df`.
|
||||
`sqlContext.sql` function allows the user to execute SQL queries.
|
||||
|
||||
=== Others
|
||||
|
||||
.Query with different timestamps
|
||||
====
|
||||
In HBaseSparkConf, four parameters related to timestamp can be set. They are TIMESTAMP,
|
||||
MIN_TIMESTAMP, MAX_TIMESTAMP and MAX_VERSIONS respectively. Users can query records with
|
||||
different timestamps or time ranges with MIN_TIMESTAMP and MAX_TIMESTAMP. In the meantime,
|
||||
use concrete value instead of tsSpecified and oldMs in the examples below.
|
||||
|
||||
The example below shows how to load df DataFrame with different timestamps.
|
||||
tsSpecified is specified by the user.
|
||||
HBaseTableCatalog defines the HBase and Relation relation schema.
|
||||
writeCatalog defines catalog for the schema mapping.
|
||||
|
||||
[source, scala]
|
||||
----
|
||||
val df = sqlContext.read
|
||||
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> tsSpecified.toString))
|
||||
|
@ -416,6 +544,8 @@ val df = sqlContext.read
|
|||
|
||||
The example below shows how to load df DataFrame with different time ranges.
|
||||
oldMs is specified by the user.
|
||||
|
||||
[source, scala]
|
||||
----
|
||||
val df = sqlContext.read
|
||||
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
|
||||
|
@ -423,132 +553,138 @@ val df = sqlContext.read
|
|||
.format("org.apache.hadoop.hbase.spark")
|
||||
.load()
|
||||
----
|
||||
|
||||
After loading df DataFrame, users can query data.
|
||||
----
|
||||
df.registerTempTable("table")
|
||||
sqlContext.sql("select count(col1) from table").show
|
||||
----
|
||||
====
|
||||
|
||||
=== Predicate Push Down
|
||||
|
||||
There are two examples of predicate push down in the HBase-Spark implementation.
|
||||
The first example shows the push down of filtering logic on the RowKey. HBase-Spark
|
||||
will reduce the filters on RowKeys down to a set of Get and/or Scan commands.
|
||||
|
||||
NOTE: The Scans are distributed scans, rather than a single client scan operation.
|
||||
|
||||
If the query looks something like the following, the logic will push down and get
|
||||
the rows through 3 Gets and 0 Scans. We can do gets because all the operations
|
||||
are `equal` operations.
|
||||
|
||||
[source,sql]
|
||||
----
|
||||
SELECT
|
||||
KEY_FIELD,
|
||||
B_FIELD,
|
||||
A_FIELD
|
||||
FROM hbaseTmp
|
||||
WHERE (KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3')
|
||||
----
|
||||
|
||||
Now let's look at an example where we will end up doing two scans on HBase.
|
||||
|
||||
[source, sql]
|
||||
----
|
||||
SELECT
|
||||
KEY_FIELD,
|
||||
B_FIELD,
|
||||
A_FIELD
|
||||
FROM hbaseTmp
|
||||
WHERE KEY_FIELD < 'get2' or KEY_FIELD > 'get3'
|
||||
----
|
||||
|
||||
In this example we will get 0 Gets and 2 Scans. One scan will load everything
|
||||
from the first row in the table until “get2” and the second scan will get
|
||||
everything from “get3” until the last row in the table.
|
||||
|
||||
The next query is a good example of having a good deal of range checks. However
|
||||
the ranges overlap. To the code will be smart enough to get the following data
|
||||
in a single scan that encompasses all the data asked by the query.
|
||||
|
||||
[source, sql]
|
||||
----
|
||||
SELECT
|
||||
KEY_FIELD,
|
||||
B_FIELD,
|
||||
A_FIELD
|
||||
FROM hbaseTmp
|
||||
WHERE
|
||||
(KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or
|
||||
(KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')
|
||||
----
|
||||
|
||||
The second example of push down functionality offered by the HBase-Spark module
|
||||
is the ability to push down filter logic for column and cell fields. Just like
|
||||
the RowKey logic, all query logic will be consolidated into the minimum number
|
||||
of range checks and equal checks by sending a Filter object along with the Scan
|
||||
with information about consolidated push down predicates
|
||||
|
||||
.SparkSQL Code Example
|
||||
====
|
||||
This example shows how we can interact with HBase with SQL.
|
||||
|
||||
[source, scala]
|
||||
----
|
||||
val sc = new SparkContext("local", "test")
|
||||
val config = new HBaseConfiguration()
|
||||
|
||||
new HBaseContext(sc, TEST_UTIL.getConfiguration)
|
||||
val sqlContext = new SQLContext(sc)
|
||||
|
||||
df = sqlContext.load("org.apache.hadoop.hbase.spark",
|
||||
Map("hbase.columns.mapping" ->
|
||||
"KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b",
|
||||
"hbase.table" -> "t1"))
|
||||
|
||||
df.registerTempTable("hbaseTmp")
|
||||
|
||||
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD FROM hbaseTmp " +
|
||||
"WHERE " +
|
||||
"(KEY_FIELD = 'get1' and B_FIELD < '3') or " +
|
||||
"(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5)
|
||||
df.registerTempTable("table")
|
||||
sqlContext.sql("select count(col1) from table").show
|
||||
----
|
||||
====
|
||||
|
||||
There are three major parts of this example that deserve explaining.
|
||||
.Native Avro support
|
||||
====
|
||||
HBase-Spark Connector support different data formats like Avro, Jason, etc. The use case below
|
||||
shows how spark supports Avro. User can persist the Avro record into HBase directly. Internally,
|
||||
the Avro schema is converted to a native Spark Catalyst data type automatically.
|
||||
Note that both key-value parts in an HBase table can be defined in Avro format.
|
||||
|
||||
The sqlContext.load function::
|
||||
In the sqlContext.load function we see two
|
||||
parameters. The first of these parameters is pointing Spark to the HBase
|
||||
DefaultSource class that will act as the interface between SparkSQL and HBase.
|
||||
1) Define catalog for the schema mapping:
|
||||
|
||||
A map of key value pairs::
|
||||
In this example we have two keys in our map, `hbase.columns.mapping` and
|
||||
`hbase.table`. The `hbase.table` directs SparkSQL to use the given HBase table.
|
||||
The `hbase.columns.mapping` key give us the logic to translate HBase columns to
|
||||
SparkSQL columns.
|
||||
+
|
||||
The `hbase.columns.mapping` is a string that follows the following format
|
||||
+
|
||||
[source, scala]
|
||||
----
|
||||
(SparkSQL.ColumnName) (SparkSQL.ColumnType) (HBase.ColumnFamily):(HBase.Qualifier)
|
||||
----
|
||||
+
|
||||
In the example below we see the definition of three fields. Because KEY_FIELD has
|
||||
no ColumnFamily, it is the RowKey.
|
||||
+
|
||||
----
|
||||
KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b
|
||||
def catalog = s"""{
|
||||
|"table":{"namespace":"default", "name":"Avrotable"},
|
||||
|"rowkey":"key",
|
||||
|"columns":{
|
||||
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|
||||
|"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
|
||||
|}
|
||||
|}""".stripMargin
|
||||
----
|
||||
|
||||
The registerTempTable function::
|
||||
This is a SparkSQL function that allows us now to be free of Scala when accessing
|
||||
our HBase table directly with SQL with the table name of "hbaseTmp".
|
||||
`catalog` is a schema for a HBase table named `Avrotable`. row key as key and
|
||||
one column col1. The rowkey also has to be defined in details as a column (col0),
|
||||
which has a specific cf (rowkey).
|
||||
|
||||
The last major point to note in the example is the `sqlContext.sql` function, which
|
||||
allows the user to ask their questions in SQL which will be pushed down to the
|
||||
DefaultSource code in the HBase-Spark module. The result of this command will be
|
||||
a DataFrame with the Schema of KEY_FIELD and B_FIELD.
|
||||
====
|
||||
2) Prepare the Data:
|
||||
|
||||
[source, scala]
|
||||
----
|
||||
object AvroHBaseRecord {
|
||||
val schemaString =
|
||||
s"""{"namespace": "example.avro",
|
||||
| "type": "record", "name": "User",
|
||||
| "fields": [
|
||||
| {"name": "name", "type": "string"},
|
||||
| {"name": "favorite_number", "type": ["int", "null"]},
|
||||
| {"name": "favorite_color", "type": ["string", "null"]},
|
||||
| {"name": "favorite_array", "type": {"type": "array", "items": "string"}},
|
||||
| {"name": "favorite_map", "type": {"type": "map", "values": "int"}}
|
||||
| ] }""".stripMargin
|
||||
|
||||
val avroSchema: Schema = {
|
||||
val p = new Schema.Parser
|
||||
p.parse(schemaString)
|
||||
}
|
||||
|
||||
def apply(i: Int): AvroHBaseRecord = {
|
||||
val user = new GenericData.Record(avroSchema);
|
||||
user.put("name", s"name${"%03d".format(i)}")
|
||||
user.put("favorite_number", i)
|
||||
user.put("favorite_color", s"color${"%03d".format(i)}")
|
||||
val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema())
|
||||
favoriteArray.add(s"number${i}")
|
||||
favoriteArray.add(s"number${i+1}")
|
||||
user.put("favorite_array", favoriteArray)
|
||||
import collection.JavaConverters._
|
||||
val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava
|
||||
user.put("favorite_map", favoriteMap)
|
||||
val avroByte = AvroSedes.serialize(user, avroSchema)
|
||||
AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte)
|
||||
}
|
||||
}
|
||||
|
||||
val data = (0 to 255).map { i =>
|
||||
AvroHBaseRecord(i)
|
||||
}
|
||||
----
|
||||
|
||||
`schemaString` is defined first, then it is parsed to get `avroSchema`. `avroSchema` is used to
|
||||
generate `AvroHBaseRecord`. `data` prepared by users is a local Scala collection
|
||||
which has 256 `AvroHBaseRecord` objects.
|
||||
|
||||
3) Save DataFrame:
|
||||
|
||||
[source, scala]
|
||||
----
|
||||
sc.parallelize(data).toDF.write.options(
|
||||
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
|
||||
.format("org.apache.spark.sql.execution.datasources.hbase")
|
||||
.save()
|
||||
----
|
||||
|
||||
Given a data frame with specified schema `catalog`, above will create an HBase table with 5
|
||||
regions and save the data frame inside.
|
||||
|
||||
4) Load the DataFrame
|
||||
|
||||
[source, scala]
|
||||
----
|
||||
def avroCatalog = s"""{
|
||||
|"table":{"namespace":"default", "name":"avrotable"},
|
||||
|"rowkey":"key",
|
||||
|"columns":{
|
||||
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|
||||
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|
||||
|}
|
||||
|}""".stripMargin
|
||||
|
||||
def withCatalog(cat: String): DataFrame = {
|
||||
sqlContext
|
||||
.read
|
||||
.options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog))
|
||||
.format("org.apache.spark.sql.execution.datasources.hbase")
|
||||
.load()
|
||||
}
|
||||
val df = withCatalog(catalog)
|
||||
----
|
||||
|
||||
In `withCatalog` function, `read` returns a DataFrameReader that can be used to read data in as a DataFrame.
|
||||
The `option` function adds input options for the underlying data source to the DataFrameReader.
|
||||
There are two options: one is to set `avroSchema` as `AvroHBaseRecord.schemaString`, and one is to
|
||||
set `HBaseTableCatalog.tableCatalog` as `avroCatalog`. The `load()` function loads input in as a DataFrame.
|
||||
The date frame `df` returned by `withCatalog` function could be used to access the HBase table.
|
||||
|
||||
5) SQL Query
|
||||
|
||||
[source, scala]
|
||||
----
|
||||
df.registerTempTable("avrotable")
|
||||
val c = sqlContext.sql("select count(1) from avrotable").
|
||||
----
|
||||
|
||||
After loading df DataFrame, users can query data. registerTempTable registers df DataFrame
|
||||
as a temporary table using the table name avrotable. `sqlContext.sql` function allows the
|
||||
user to execute SQL queries.
|
||||
====
|
Loading…
Reference in New Issue