diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala index c71ee4e5f7d..7970816e9bd 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala @@ -88,12 +88,6 @@ case class HBaseRelation ( userSpecifiedSchema: Option[StructType] )(@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan with InsertableRelation with Logging { - - val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong) - val minTimeStamp = parameters.get(HBaseSparkConf.MIN_TIMESTAMP).map(_.toLong) - val maxTimeStamp = parameters.get(HBaseSparkConf.MAX_TIMESTAMP).map(_.toLong) - val maxVersions = parameters.get(HBaseSparkConf.MAX_VERSIONS).map(_.toInt) - val catalog = HBaseTableCatalog(parameters) def tableName = catalog.name val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_RESOURCES_LOCATIONS, "") @@ -210,7 +204,7 @@ case class HBaseRelation ( System.arraycopy(x, 0, rBytes, offset, x.length) offset += x.length } - val put = timestamp.fold(new Put(rBytes))(new Put(rBytes, _)) + val put = new Put(rBytes) colsIdxedFields.foreach { case (x, y) => val b = Utils.toBytes(row(x), y) diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala index 2e4c0b39dab..ca44d424876 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala @@ -36,9 +36,4 @@ object HBaseSparkConf{ val USE_HBASE_CONTEXT = "hbase.use.hbase.context" val PUSH_DOWN_COLUMN_FILTER = "hbase.pushdown.column.filter" val defaultPushDownColumnFilter = true - - val TIMESTAMP = "timestamp" - val MIN_TIMESTAMP = "minTimestamp" - val MAX_TIMESTAMP = "maxTimestamp" - val MAX_VERSIONS = "maxVersions" } diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala index 886114a1d53..2e056513336 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala @@ -105,7 +105,6 @@ class HBaseTableScanRDD(relation: HBaseRelation, val gets = new ArrayList[Get]() x.foreach{ y => val g = new Get(y) - handleTimeSemantics(g) columns.foreach { d => if (!d.isRowKey) { g.addColumn(d.cfBytes, d.colBytes) @@ -158,7 +157,6 @@ class HBaseTableScanRDD(relation: HBaseRelation, case (Some(Bound(a, b)), None) => new Scan(a) case (None, None) => new Scan() } - handleTimeSemantics(scan) columns.foreach { d => if (!d.isRowKey) { @@ -228,30 +226,6 @@ class HBaseTableScanRDD(relation: HBaseRelation, } ++ gIt rIts } - - private def handleTimeSemantics(query: Query): Unit = { - // Set timestamp related values if present - (query, relation.timestamp, relation.minTimeStamp, relation.maxTimeStamp) match { - case (q: Scan, Some(ts), None, None) => q.setTimeStamp(ts) - case (q: Get, Some(ts), None, None) => q.setTimeStamp(ts) - - case (q:Scan, None, Some(minStamp), Some(maxStamp)) => q.setTimeRange(minStamp, maxStamp) - case (q:Get, None, Some(minStamp), Some(maxStamp)) => q.setTimeRange(minStamp, maxStamp) - - case (q, None, None, None) => - - case _ => throw new IllegalArgumentException(s"Invalid combination of query/timestamp/time range provided. " + - s"timeStamp is: ${relation.timestamp.get}, minTimeStamp is: ${relation.minTimeStamp.get}, " + - s"maxTimeStamp is: ${relation.maxTimeStamp.get}") - } - if (relation.maxVersions.isDefined) { - query match { - case q: Scan => q.setMaxVersions(relation.maxVersions.get) - case q: Get => q.setMaxVersions(relation.maxVersions.get) - case _ => throw new IllegalArgumentException("Invalid query provided with maxVersions") - } - } - } } case class SerializedFilter(b: Option[Array[Byte]]) diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala index 4ad10e2b2ab..500967dfc46 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala @@ -19,39 +19,32 @@ package org.apache.hadoop.hbase.spark import org.apache.avro.Schema import org.apache.avro.generic.GenericData -import org.apache.hadoop.hbase.client.{ConnectionFactory, Put} +import org.apache.hadoop.hbase.client.{Put, ConnectionFactory} import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.{HBaseTestingUtility, TableName} +import org.apache.hadoop.hbase.{TableName, HBaseTestingUtility} import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog -import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.{Logging, SparkConf, SparkContext} +import org.apache.spark.{SparkConf, SparkContext, Logging} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} case class HBaseRecord( - col0: String, - col1: Boolean, - col2: Double, - col3: Float, - col4: Int, - col5: Long, - col6: Short, - col7: String, - col8: Byte) + col0: String, + col1: String, + col2: Double, + col3: Float, + col4: Int, + col5: Long) object HBaseRecord { def apply(i: Int, t: String): HBaseRecord = { val s = s"""row${"%03d".format(i)}""" HBaseRecord(s, - i % 2 == 0, + s, i.toDouble, i.toFloat, i, - i.toLong, - i.toShort, - s"String$i: $t", - i.toByte) + i.toLong) } } @@ -822,14 +815,11 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { |"rowkey":"key", |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, - |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, + |"col1":{"cf":"cf1", "col":"col1", "type":"string"}, |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, - |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, - |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, - |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, - |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} + |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}} |} |}""".stripMargin @@ -876,75 +866,6 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { assert(s.count() == 6) } - test("Timestamp semantics") { - val sql = sqlContext - import sql.implicits._ - - // There's already some data in here from recently. Let's throw something in - // from 1993 which we can include/exclude and add some data with the implicit (now) timestamp. - // Then we should be able to cross-section it and only get points in between, get the most recent view - // and get an old view. - val oldMs = 754869600000L - val startMs = System.currentTimeMillis() - val oldData = (0 to 100).map { i => - HBaseRecord(i, "old") - } - val newData = (200 to 255).map { i => - HBaseRecord(i, "new") - } - - sc.parallelize(oldData).toDF.write.options( - Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.tableName -> "5", - HBaseSparkConf.TIMESTAMP -> oldMs.toString)) - .format("org.apache.hadoop.hbase.spark") - .save() - sc.parallelize(newData).toDF.write.options( - Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.tableName -> "5")) - .format("org.apache.hadoop.hbase.spark") - .save() - - // Test specific timestamp -- Full scan, Timestamp - val individualTimestamp = sqlContext.read - .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> oldMs.toString)) - .format("org.apache.hadoop.hbase.spark") - .load(); - assert(individualTimestamp.count() == 101) - - // Test getting everything -- Full Scan, No range - val everything = sqlContext.read - .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog)) - .format("org.apache.hadoop.hbase.spark") - .load() - assert(everything.count() == 256) - // Test getting everything -- Pruned Scan, TimeRange - val element50 = everything.where(col("col0") === lit("row050")).select("col7").collect()(0)(0) - assert(element50 == "String50: extra") - val element200 = everything.where(col("col0") === lit("row200")).select("col7").collect()(0)(0) - assert(element200 == "String200: new") - - // Test Getting old stuff -- Full Scan, TimeRange - val oldRange = sqlContext.read - .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0", - HBaseSparkConf.MAX_TIMESTAMP -> (oldMs + 100).toString)) - .format("org.apache.hadoop.hbase.spark") - .load() - assert(oldRange.count() == 101) - // Test Getting old stuff -- Pruned Scan, TimeRange - val oldElement50 = oldRange.where(col("col0") === lit("row050")).select("col7").collect()(0)(0) - assert(oldElement50 == "String50: old") - - // Test Getting middle stuff -- Full Scan, TimeRange - val middleRange = sqlContext.read - .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0", - HBaseSparkConf.MAX_TIMESTAMP -> (startMs + 100).toString)) - .format("org.apache.hadoop.hbase.spark") - .load() - assert(middleRange.count() == 256) - // Test Getting middle stuff -- Pruned Scan, TimeRange - val middleElement200 = middleRange.where(col("col0") === lit("row200")).select("col7").collect()(0)(0) - assert(middleElement200 == "String200: extra") - } - // catalog for insertion def avroWriteCatalog = s"""{ |"table":{"namespace":"default", "name":"avrotable"},