From eec27ad7ef7b5078f705301bd3042991d4d4b4d9 Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 31 Mar 2016 19:08:33 -0700 Subject: [PATCH] HBASE-15572 Adding optional timestamp semantics to HBase-Spark (Weiqing Yang) --- .../hadoop/hbase/spark/DefaultSource.scala | 8 +- .../spark/datasources/HBaseSparkConf.scala | 5 + .../spark/datasources/HBaseTableScanRDD.scala | 26 +++++ .../hbase/spark/DefaultSourceSuite.scala | 105 +++++++++++++++--- 4 files changed, 130 insertions(+), 14 deletions(-) diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala index 7970816e9bd..c71ee4e5f7d 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala @@ -88,6 +88,12 @@ case class HBaseRelation ( userSpecifiedSchema: Option[StructType] )(@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan with InsertableRelation with Logging { + + val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong) + val minTimeStamp = parameters.get(HBaseSparkConf.MIN_TIMESTAMP).map(_.toLong) + val maxTimeStamp = parameters.get(HBaseSparkConf.MAX_TIMESTAMP).map(_.toLong) + val maxVersions = parameters.get(HBaseSparkConf.MAX_VERSIONS).map(_.toInt) + val catalog = HBaseTableCatalog(parameters) def tableName = catalog.name val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_RESOURCES_LOCATIONS, "") @@ -204,7 +210,7 @@ case class HBaseRelation ( System.arraycopy(x, 0, rBytes, offset, x.length) offset += x.length } - val put = new Put(rBytes) + val put = timestamp.fold(new Put(rBytes))(new Put(rBytes, _)) colsIdxedFields.foreach { case (x, y) => val b = Utils.toBytes(row(x), y) diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala index ca44d424876..2e4c0b39dab 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala @@ -36,4 +36,9 @@ object HBaseSparkConf{ val USE_HBASE_CONTEXT = "hbase.use.hbase.context" val PUSH_DOWN_COLUMN_FILTER = "hbase.pushdown.column.filter" val defaultPushDownColumnFilter = true + + val TIMESTAMP = "timestamp" + val MIN_TIMESTAMP = "minTimestamp" + val MAX_TIMESTAMP = "maxTimestamp" + val MAX_VERSIONS = "maxVersions" } diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala index 2e056513336..886114a1d53 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala @@ -105,6 +105,7 @@ class HBaseTableScanRDD(relation: HBaseRelation, val gets = new ArrayList[Get]() x.foreach{ y => val g = new Get(y) + handleTimeSemantics(g) columns.foreach { d => if (!d.isRowKey) { g.addColumn(d.cfBytes, d.colBytes) @@ -157,6 +158,7 @@ class HBaseTableScanRDD(relation: HBaseRelation, case (Some(Bound(a, b)), None) => new Scan(a) case (None, None) => new Scan() } + handleTimeSemantics(scan) columns.foreach { d => if (!d.isRowKey) { @@ -226,6 +228,30 @@ class HBaseTableScanRDD(relation: HBaseRelation, } ++ gIt rIts } + + private def handleTimeSemantics(query: Query): Unit = { + // Set timestamp related values if present + (query, relation.timestamp, relation.minTimeStamp, relation.maxTimeStamp) match { + case (q: Scan, Some(ts), None, None) => q.setTimeStamp(ts) + case (q: Get, Some(ts), None, None) => q.setTimeStamp(ts) + + case (q:Scan, None, Some(minStamp), Some(maxStamp)) => q.setTimeRange(minStamp, maxStamp) + case (q:Get, None, Some(minStamp), Some(maxStamp)) => q.setTimeRange(minStamp, maxStamp) + + case (q, None, None, None) => + + case _ => throw new IllegalArgumentException(s"Invalid combination of query/timestamp/time range provided. " + + s"timeStamp is: ${relation.timestamp.get}, minTimeStamp is: ${relation.minTimeStamp.get}, " + + s"maxTimeStamp is: ${relation.maxTimeStamp.get}") + } + if (relation.maxVersions.isDefined) { + query match { + case q: Scan => q.setMaxVersions(relation.maxVersions.get) + case q: Get => q.setMaxVersions(relation.maxVersions.get) + case _ => throw new IllegalArgumentException("Invalid query provided with maxVersions") + } + } + } } case class SerializedFilter(b: Option[Array[Byte]]) diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala index 500967dfc46..4ad10e2b2ab 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala @@ -19,32 +19,39 @@ package org.apache.hadoop.hbase.spark import org.apache.avro.Schema import org.apache.avro.generic.GenericData -import org.apache.hadoop.hbase.client.{Put, ConnectionFactory} +import org.apache.hadoop.hbase.client.{ConnectionFactory, Put} import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.{TableName, HBaseTestingUtility} +import org.apache.hadoop.hbase.{HBaseTestingUtility, TableName} import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog +import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.apache.spark.{Logging, SparkConf, SparkContext} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} case class HBaseRecord( - col0: String, - col1: String, - col2: Double, - col3: Float, - col4: Int, - col5: Long) + col0: String, + col1: Boolean, + col2: Double, + col3: Float, + col4: Int, + col5: Long, + col6: Short, + col7: String, + col8: Byte) object HBaseRecord { def apply(i: Int, t: String): HBaseRecord = { val s = s"""row${"%03d".format(i)}""" HBaseRecord(s, - s, + i % 2 == 0, i.toDouble, i.toFloat, i, - i.toLong) + i.toLong, + i.toShort, + s"String$i: $t", + i.toByte) } } @@ -815,11 +822,14 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { |"rowkey":"key", |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, - |"col1":{"cf":"cf1", "col":"col1", "type":"string"}, + |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, - |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}} + |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, + |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, + |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, + |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} |} |}""".stripMargin @@ -866,6 +876,75 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { assert(s.count() == 6) } + test("Timestamp semantics") { + val sql = sqlContext + import sql.implicits._ + + // There's already some data in here from recently. Let's throw something in + // from 1993 which we can include/exclude and add some data with the implicit (now) timestamp. + // Then we should be able to cross-section it and only get points in between, get the most recent view + // and get an old view. + val oldMs = 754869600000L + val startMs = System.currentTimeMillis() + val oldData = (0 to 100).map { i => + HBaseRecord(i, "old") + } + val newData = (200 to 255).map { i => + HBaseRecord(i, "new") + } + + sc.parallelize(oldData).toDF.write.options( + Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.tableName -> "5", + HBaseSparkConf.TIMESTAMP -> oldMs.toString)) + .format("org.apache.hadoop.hbase.spark") + .save() + sc.parallelize(newData).toDF.write.options( + Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.tableName -> "5")) + .format("org.apache.hadoop.hbase.spark") + .save() + + // Test specific timestamp -- Full scan, Timestamp + val individualTimestamp = sqlContext.read + .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> oldMs.toString)) + .format("org.apache.hadoop.hbase.spark") + .load(); + assert(individualTimestamp.count() == 101) + + // Test getting everything -- Full Scan, No range + val everything = sqlContext.read + .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog)) + .format("org.apache.hadoop.hbase.spark") + .load() + assert(everything.count() == 256) + // Test getting everything -- Pruned Scan, TimeRange + val element50 = everything.where(col("col0") === lit("row050")).select("col7").collect()(0)(0) + assert(element50 == "String50: extra") + val element200 = everything.where(col("col0") === lit("row200")).select("col7").collect()(0)(0) + assert(element200 == "String200: new") + + // Test Getting old stuff -- Full Scan, TimeRange + val oldRange = sqlContext.read + .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0", + HBaseSparkConf.MAX_TIMESTAMP -> (oldMs + 100).toString)) + .format("org.apache.hadoop.hbase.spark") + .load() + assert(oldRange.count() == 101) + // Test Getting old stuff -- Pruned Scan, TimeRange + val oldElement50 = oldRange.where(col("col0") === lit("row050")).select("col7").collect()(0)(0) + assert(oldElement50 == "String50: old") + + // Test Getting middle stuff -- Full Scan, TimeRange + val middleRange = sqlContext.read + .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0", + HBaseSparkConf.MAX_TIMESTAMP -> (startMs + 100).toString)) + .format("org.apache.hadoop.hbase.spark") + .load() + assert(middleRange.count() == 256) + // Test Getting middle stuff -- Pruned Scan, TimeRange + val middleElement200 = middleRange.where(col("col0") === lit("row200")).select("col7").collect()(0)(0) + assert(middleElement200 == "String200: extra") + } + // catalog for insertion def avroWriteCatalog = s"""{ |"table":{"namespace":"default", "name":"avrotable"},