HBASE-15572 Adding optional timestamp semantics to HBase-Spark (Weiqing Yang)
This commit is contained in:
parent
8c7f044efb
commit
eec27ad7ef
|
@ -88,6 +88,12 @@ case class HBaseRelation (
|
|||
userSpecifiedSchema: Option[StructType]
|
||||
)(@transient val sqlContext: SQLContext)
|
||||
extends BaseRelation with PrunedFilteredScan with InsertableRelation with Logging {
|
||||
|
||||
val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong)
|
||||
val minTimeStamp = parameters.get(HBaseSparkConf.MIN_TIMESTAMP).map(_.toLong)
|
||||
val maxTimeStamp = parameters.get(HBaseSparkConf.MAX_TIMESTAMP).map(_.toLong)
|
||||
val maxVersions = parameters.get(HBaseSparkConf.MAX_VERSIONS).map(_.toInt)
|
||||
|
||||
val catalog = HBaseTableCatalog(parameters)
|
||||
def tableName = catalog.name
|
||||
val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_RESOURCES_LOCATIONS, "")
|
||||
|
@ -204,7 +210,7 @@ case class HBaseRelation (
|
|||
System.arraycopy(x, 0, rBytes, offset, x.length)
|
||||
offset += x.length
|
||||
}
|
||||
val put = new Put(rBytes)
|
||||
val put = timestamp.fold(new Put(rBytes))(new Put(rBytes, _))
|
||||
|
||||
colsIdxedFields.foreach { case (x, y) =>
|
||||
val b = Utils.toBytes(row(x), y)
|
||||
|
|
|
@ -36,4 +36,9 @@ object HBaseSparkConf{
|
|||
val USE_HBASE_CONTEXT = "hbase.use.hbase.context"
|
||||
val PUSH_DOWN_COLUMN_FILTER = "hbase.pushdown.column.filter"
|
||||
val defaultPushDownColumnFilter = true
|
||||
|
||||
val TIMESTAMP = "timestamp"
|
||||
val MIN_TIMESTAMP = "minTimestamp"
|
||||
val MAX_TIMESTAMP = "maxTimestamp"
|
||||
val MAX_VERSIONS = "maxVersions"
|
||||
}
|
||||
|
|
|
@ -105,6 +105,7 @@ class HBaseTableScanRDD(relation: HBaseRelation,
|
|||
val gets = new ArrayList[Get]()
|
||||
x.foreach{ y =>
|
||||
val g = new Get(y)
|
||||
handleTimeSemantics(g)
|
||||
columns.foreach { d =>
|
||||
if (!d.isRowKey) {
|
||||
g.addColumn(d.cfBytes, d.colBytes)
|
||||
|
@ -157,6 +158,7 @@ class HBaseTableScanRDD(relation: HBaseRelation,
|
|||
case (Some(Bound(a, b)), None) => new Scan(a)
|
||||
case (None, None) => new Scan()
|
||||
}
|
||||
handleTimeSemantics(scan)
|
||||
|
||||
columns.foreach { d =>
|
||||
if (!d.isRowKey) {
|
||||
|
@ -226,6 +228,30 @@ class HBaseTableScanRDD(relation: HBaseRelation,
|
|||
} ++ gIt
|
||||
rIts
|
||||
}
|
||||
|
||||
private def handleTimeSemantics(query: Query): Unit = {
|
||||
// Set timestamp related values if present
|
||||
(query, relation.timestamp, relation.minTimeStamp, relation.maxTimeStamp) match {
|
||||
case (q: Scan, Some(ts), None, None) => q.setTimeStamp(ts)
|
||||
case (q: Get, Some(ts), None, None) => q.setTimeStamp(ts)
|
||||
|
||||
case (q:Scan, None, Some(minStamp), Some(maxStamp)) => q.setTimeRange(minStamp, maxStamp)
|
||||
case (q:Get, None, Some(minStamp), Some(maxStamp)) => q.setTimeRange(minStamp, maxStamp)
|
||||
|
||||
case (q, None, None, None) =>
|
||||
|
||||
case _ => throw new IllegalArgumentException(s"Invalid combination of query/timestamp/time range provided. " +
|
||||
s"timeStamp is: ${relation.timestamp.get}, minTimeStamp is: ${relation.minTimeStamp.get}, " +
|
||||
s"maxTimeStamp is: ${relation.maxTimeStamp.get}")
|
||||
}
|
||||
if (relation.maxVersions.isDefined) {
|
||||
query match {
|
||||
case q: Scan => q.setMaxVersions(relation.maxVersions.get)
|
||||
case q: Get => q.setMaxVersions(relation.maxVersions.get)
|
||||
case _ => throw new IllegalArgumentException("Invalid query provided with maxVersions")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case class SerializedFilter(b: Option[Array[Byte]])
|
||||
|
|
|
@ -19,32 +19,39 @@ package org.apache.hadoop.hbase.spark
|
|||
|
||||
import org.apache.avro.Schema
|
||||
import org.apache.avro.generic.GenericData
|
||||
import org.apache.hadoop.hbase.client.{Put, ConnectionFactory}
|
||||
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
|
||||
import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf
|
||||
import org.apache.hadoop.hbase.util.Bytes
|
||||
import org.apache.hadoop.hbase.{TableName, HBaseTestingUtility}
|
||||
import org.apache.hadoop.hbase.{HBaseTestingUtility, TableName}
|
||||
import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.{DataFrame, SQLContext}
|
||||
import org.apache.spark.{SparkConf, SparkContext, Logging}
|
||||
import org.apache.spark.{Logging, SparkConf, SparkContext}
|
||||
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
|
||||
|
||||
case class HBaseRecord(
|
||||
col0: String,
|
||||
col1: String,
|
||||
col1: Boolean,
|
||||
col2: Double,
|
||||
col3: Float,
|
||||
col4: Int,
|
||||
col5: Long)
|
||||
col5: Long,
|
||||
col6: Short,
|
||||
col7: String,
|
||||
col8: Byte)
|
||||
|
||||
object HBaseRecord {
|
||||
def apply(i: Int, t: String): HBaseRecord = {
|
||||
val s = s"""row${"%03d".format(i)}"""
|
||||
HBaseRecord(s,
|
||||
s,
|
||||
i % 2 == 0,
|
||||
i.toDouble,
|
||||
i.toFloat,
|
||||
i,
|
||||
i.toLong)
|
||||
i.toLong,
|
||||
i.toShort,
|
||||
s"String$i: $t",
|
||||
i.toByte)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -815,11 +822,14 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
|
|||
|"rowkey":"key",
|
||||
|"columns":{
|
||||
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|
||||
|"col1":{"cf":"cf1", "col":"col1", "type":"string"},
|
||||
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|
||||
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|
||||
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|
||||
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|
||||
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}}
|
||||
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|
||||
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|
||||
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|
||||
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|
||||
|}
|
||||
|}""".stripMargin
|
||||
|
||||
|
@ -866,6 +876,75 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
|
|||
assert(s.count() == 6)
|
||||
}
|
||||
|
||||
test("Timestamp semantics") {
|
||||
val sql = sqlContext
|
||||
import sql.implicits._
|
||||
|
||||
// There's already some data in here from recently. Let's throw something in
|
||||
// from 1993 which we can include/exclude and add some data with the implicit (now) timestamp.
|
||||
// Then we should be able to cross-section it and only get points in between, get the most recent view
|
||||
// and get an old view.
|
||||
val oldMs = 754869600000L
|
||||
val startMs = System.currentTimeMillis()
|
||||
val oldData = (0 to 100).map { i =>
|
||||
HBaseRecord(i, "old")
|
||||
}
|
||||
val newData = (200 to 255).map { i =>
|
||||
HBaseRecord(i, "new")
|
||||
}
|
||||
|
||||
sc.parallelize(oldData).toDF.write.options(
|
||||
Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.tableName -> "5",
|
||||
HBaseSparkConf.TIMESTAMP -> oldMs.toString))
|
||||
.format("org.apache.hadoop.hbase.spark")
|
||||
.save()
|
||||
sc.parallelize(newData).toDF.write.options(
|
||||
Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.tableName -> "5"))
|
||||
.format("org.apache.hadoop.hbase.spark")
|
||||
.save()
|
||||
|
||||
// Test specific timestamp -- Full scan, Timestamp
|
||||
val individualTimestamp = sqlContext.read
|
||||
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> oldMs.toString))
|
||||
.format("org.apache.hadoop.hbase.spark")
|
||||
.load();
|
||||
assert(individualTimestamp.count() == 101)
|
||||
|
||||
// Test getting everything -- Full Scan, No range
|
||||
val everything = sqlContext.read
|
||||
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog))
|
||||
.format("org.apache.hadoop.hbase.spark")
|
||||
.load()
|
||||
assert(everything.count() == 256)
|
||||
// Test getting everything -- Pruned Scan, TimeRange
|
||||
val element50 = everything.where(col("col0") === lit("row050")).select("col7").collect()(0)(0)
|
||||
assert(element50 == "String50: extra")
|
||||
val element200 = everything.where(col("col0") === lit("row200")).select("col7").collect()(0)(0)
|
||||
assert(element200 == "String200: new")
|
||||
|
||||
// Test Getting old stuff -- Full Scan, TimeRange
|
||||
val oldRange = sqlContext.read
|
||||
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
|
||||
HBaseSparkConf.MAX_TIMESTAMP -> (oldMs + 100).toString))
|
||||
.format("org.apache.hadoop.hbase.spark")
|
||||
.load()
|
||||
assert(oldRange.count() == 101)
|
||||
// Test Getting old stuff -- Pruned Scan, TimeRange
|
||||
val oldElement50 = oldRange.where(col("col0") === lit("row050")).select("col7").collect()(0)(0)
|
||||
assert(oldElement50 == "String50: old")
|
||||
|
||||
// Test Getting middle stuff -- Full Scan, TimeRange
|
||||
val middleRange = sqlContext.read
|
||||
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
|
||||
HBaseSparkConf.MAX_TIMESTAMP -> (startMs + 100).toString))
|
||||
.format("org.apache.hadoop.hbase.spark")
|
||||
.load()
|
||||
assert(middleRange.count() == 256)
|
||||
// Test Getting middle stuff -- Pruned Scan, TimeRange
|
||||
val middleElement200 = middleRange.where(col("col0") === lit("row200")).select("col7").collect()(0)(0)
|
||||
assert(middleElement200 == "String200: extra")
|
||||
}
|
||||
|
||||
// catalog for insertion
|
||||
def avroWriteCatalog = s"""{
|
||||
|"table":{"namespace":"default", "name":"avrotable"},
|
||||
|
|
Loading…
Reference in New Issue