HBASE-15572 Adding optional timestamp semantics to HBase-Spark

4 parameters, "timestamp", "minTimestamp", "maxiTimestamp" and
"maxVersions" are added to HBaseSparkConf. Users can select a
timestamp, they can also select a time range with minimum timestamp and
maximum timestamp.

Signed-off-by: Sean Busbey <busbey@apache.org>
Signed-off-by: Ted Yu <tedyu@apache.org>
Signed-off-by: Jerry He <jerryjch@apache.org>
This commit is contained in:
Weiqing Yang 2016-04-13 22:35:27 -05:00 committed by Sean Busbey
parent 5a7c8dcb68
commit 58177c103f
5 changed files with 165 additions and 14 deletions

View File

@ -88,6 +88,11 @@ case class HBaseRelation (
userSpecifiedSchema: Option[StructType]
)(@transient val sqlContext: SQLContext)
extends BaseRelation with PrunedFilteredScan with InsertableRelation with Logging {
val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong)
val minTimestamp = parameters.get(HBaseSparkConf.MIN_TIMESTAMP).map(_.toLong)
val maxTimestamp = parameters.get(HBaseSparkConf.MAX_TIMESTAMP).map(_.toLong)
val maxVersions = parameters.get(HBaseSparkConf.MAX_VERSIONS).map(_.toInt)
val catalog = HBaseTableCatalog(parameters)
def tableName = catalog.name
val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_RESOURCES_LOCATIONS, "")
@ -204,7 +209,7 @@ case class HBaseRelation (
System.arraycopy(x, 0, rBytes, offset, x.length)
offset += x.length
}
val put = new Put(rBytes)
val put = timestamp.fold(new Put(rBytes))(new Put(rBytes, _))
colsIdxedFields.foreach { case (x, y) =>
val b = Utils.toBytes(row(x), y)

View File

@ -36,4 +36,9 @@ object HBaseSparkConf{
val USE_HBASE_CONTEXT = "hbase.use.hbase.context"
val PUSH_DOWN_COLUMN_FILTER = "hbase.pushdown.column.filter"
val defaultPushDownColumnFilter = true
val TIMESTAMP = "hbase.spark.query.timestamp"
val MIN_TIMESTAMP = "hbase.spark.query.minTimestamp"
val MAX_TIMESTAMP = "hbase.spark.query.maxTimestamp"
val MAX_VERSIONS = "hbase.spark.query.maxVersions"
}

View File

@ -105,6 +105,7 @@ class HBaseTableScanRDD(relation: HBaseRelation,
val gets = new ArrayList[Get]()
x.foreach{ y =>
val g = new Get(y)
handleTimeSemantics(g)
columns.foreach { d =>
if (!d.isRowKey) {
g.addColumn(d.cfBytes, d.colBytes)
@ -157,6 +158,7 @@ class HBaseTableScanRDD(relation: HBaseRelation,
case (Some(Bound(a, b)), None) => new Scan(a)
case (None, None) => new Scan()
}
handleTimeSemantics(scan)
columns.foreach { d =>
if (!d.isRowKey) {
@ -226,6 +228,30 @@ class HBaseTableScanRDD(relation: HBaseRelation,
} ++ gIt
rIts
}
private def handleTimeSemantics(query: Query): Unit = {
// Set timestamp related values if present
(query, relation.timestamp, relation.minTimestamp, relation.maxTimestamp) match {
case (q: Scan, Some(ts), None, None) => q.setTimeStamp(ts)
case (q: Get, Some(ts), None, None) => q.setTimeStamp(ts)
case (q:Scan, None, Some(minStamp), Some(maxStamp)) => q.setTimeRange(minStamp, maxStamp)
case (q:Get, None, Some(minStamp), Some(maxStamp)) => q.setTimeRange(minStamp, maxStamp)
case (q, None, None, None) =>
case _ => throw new IllegalArgumentException(s"Invalid combination of query/timestamp/time range provided. " +
s"timeStamp is: ${relation.timestamp.get}, minTimeStamp is: ${relation.minTimestamp.get}, " +
s"maxTimeStamp is: ${relation.maxTimestamp.get}")
}
if (relation.maxVersions.isDefined) {
query match {
case q: Scan => q.setMaxVersions(relation.maxVersions.get)
case q: Get => q.setMaxVersions(relation.maxVersions.get)
case _ => throw new IllegalArgumentException("Invalid query provided with maxVersions")
}
}
}
}
case class SerializedFilter(b: Option[Array[Byte]])

View File

@ -19,32 +19,39 @@ package org.apache.hadoop.hbase.spark
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hadoop.hbase.client.{Put, ConnectionFactory}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{TableName, HBaseTestingUtility}
import org.apache.hadoop.hbase.{HBaseTestingUtility, TableName}
import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
case class HBaseRecord(
col0: String,
col1: String,
col2: Double,
col3: Float,
col4: Int,
col5: Long)
col0: String,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte)
object HBaseRecord {
def apply(i: Int, t: String): HBaseRecord = {
val s = s"""row${"%03d".format(i)}"""
HBaseRecord(s,
s,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong)
i.toLong,
i.toShort,
s"String$i: $t",
i.toByte)
}
}
@ -815,11 +822,14 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}}
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
@ -866,6 +876,76 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
assert(s.count() == 6)
}
test("Timestamp semantics") {
val sql = sqlContext
import sql.implicits._
// There's already some data in here from recently. Let's throw something in
// from 1993 which we can include/exclude and add some data with the implicit (now) timestamp.
// Then we should be able to cross-section it and only get points in between, get the most recent view
// and get an old view.
val oldMs = 754869600000L
val startMs = System.currentTimeMillis()
val oldData = (0 to 100).map { i =>
HBaseRecord(i, "old")
}
val newData = (200 to 255).map { i =>
HBaseRecord(i, "new")
}
sc.parallelize(oldData).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.tableName -> "5",
HBaseSparkConf.TIMESTAMP -> oldMs.toString))
.format("org.apache.hadoop.hbase.spark")
.save()
sc.parallelize(newData).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.tableName -> "5"))
.format("org.apache.hadoop.hbase.spark")
.save()
// Test specific timestamp -- Full scan, Timestamp
val individualTimestamp = sqlContext.read
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> oldMs.toString))
.format("org.apache.hadoop.hbase.spark")
.load()
assert(individualTimestamp.count() == 101)
// Test getting everything -- Full Scan, No range
val everything = sqlContext.read
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog))
.format("org.apache.hadoop.hbase.spark")
.load()
assert(everything.count() == 256)
// Test getting everything -- Pruned Scan, TimeRange
val element50 = everything.where(col("col0") === lit("row050")).select("col7").collect()(0)(0)
assert(element50 == "String50: extra")
val element200 = everything.where(col("col0") === lit("row200")).select("col7").collect()(0)(0)
assert(element200 == "String200: new")
// Test Getting old stuff -- Full Scan, TimeRange
val oldRange = sqlContext.read
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
HBaseSparkConf.MAX_TIMESTAMP -> (oldMs + 100).toString))
.format("org.apache.hadoop.hbase.spark")
.load()
assert(oldRange.count() == 101)
// Test Getting old stuff -- Pruned Scan, TimeRange
val oldElement50 = oldRange.where(col("col0") === lit("row050")).select("col7").collect()(0)(0)
assert(oldElement50 == "String50: old")
// Test Getting middle stuff -- Full Scan, TimeRange
val middleRange = sqlContext.read
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
HBaseSparkConf.MAX_TIMESTAMP -> (startMs + 100).toString))
.format("org.apache.hadoop.hbase.spark")
.load()
assert(middleRange.count() == 256)
// Test Getting middle stuff -- Pruned Scan, TimeRange
val middleElement200 = middleRange.where(col("col0") === lit("row200")).select("col7").collect()(0)(0)
assert(middleElement200 == "String200: extra")
}
// catalog for insertion
def avroWriteCatalog = s"""{
|"table":{"namespace":"default", "name":"avrotable"},

View File

@ -395,6 +395,41 @@ The HBase-Spark module includes support for Spark SQL and DataFrames, which allo
you to write SparkSQL directly on HBase tables. In addition the HBase-Spark
will push down query filtering logic to HBase.
In HBaseSparkConf, four parameters related to timestamp can be set. They are TIMESTAMP,
MIN_TIMESTAMP, MAX_TIMESTAMP and MAX_VERSIONS respectively. Users can query records
with different timestamps or time ranges with MIN_TIMESTAMP and MAX_TIMESTAMP.
In the meantime, use concrete value instead of tsSpecified and oldMs in the examples below.
.Query with different timestamps
====
The example below shows how to load df DataFrame with different timestamps.
tsSpecified is specified by the user.
HBaseTableCatalog defines the HBase and Relation relation schema.
writeCatalog defines catalog for the schema mapping.
----
val df = sqlContext.read
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> tsSpecified.toString))
.format("org.apache.hadoop.hbase.spark")
.load()
----
The example below shows how to load df DataFrame with different time ranges.
oldMs is specified by the user.
----
val df = sqlContext.read
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString))
.format("org.apache.hadoop.hbase.spark")
.load()
----
After loading df DataFrame, users can query data.
----
df.registerTempTable("table")
sqlContext.sql("select count(col1) from table").show
----
=== Predicate Push Down
There are two examples of predicate push down in the HBase-Spark implementation.