HBASE-14849 Add option to set block cache to false on SparkSQL executions (Zhan Zhang)
This commit is contained in:
parent
8c921ea94f
commit
e75e26e3c6
|
@ -21,7 +21,9 @@ import java.util
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue
|
import java.util.concurrent.ConcurrentLinkedQueue
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.client._
|
import org.apache.hadoop.hbase.client._
|
||||||
import org.apache.hadoop.hbase.spark.datasources.{HBaseTableScanRDD, HBaseRegion, SerializableConfiguration}
|
import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf
|
||||||
|
import org.apache.hadoop.hbase.spark.datasources.HBaseTableScanRDD
|
||||||
|
import org.apache.hadoop.hbase.spark.datasources.SerializableConfiguration
|
||||||
import org.apache.hadoop.hbase.types._
|
import org.apache.hadoop.hbase.types._
|
||||||
import org.apache.hadoop.hbase.util.{Bytes, PositionedByteRange, SimplePositionedMutableByteRange}
|
import org.apache.hadoop.hbase.util.{Bytes, PositionedByteRange, SimplePositionedMutableByteRange}
|
||||||
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
|
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
|
||||||
|
@ -49,8 +51,6 @@ class DefaultSource extends RelationProvider with Logging {
|
||||||
|
|
||||||
val TABLE_KEY:String = "hbase.table"
|
val TABLE_KEY:String = "hbase.table"
|
||||||
val SCHEMA_COLUMNS_MAPPING_KEY:String = "hbase.columns.mapping"
|
val SCHEMA_COLUMNS_MAPPING_KEY:String = "hbase.columns.mapping"
|
||||||
val BATCHING_NUM_KEY:String = "hbase.batching.num"
|
|
||||||
val CACHING_NUM_KEY:String = "hbase.caching.num"
|
|
||||||
val HBASE_CONFIG_RESOURCES_LOCATIONS:String = "hbase.config.resources"
|
val HBASE_CONFIG_RESOURCES_LOCATIONS:String = "hbase.config.resources"
|
||||||
val USE_HBASE_CONTEXT:String = "hbase.use.hbase.context"
|
val USE_HBASE_CONTEXT:String = "hbase.use.hbase.context"
|
||||||
val PUSH_DOWN_COLUMN_FILTER:String = "hbase.push.down.column.filter"
|
val PUSH_DOWN_COLUMN_FILTER:String = "hbase.push.down.column.filter"
|
||||||
|
@ -71,35 +71,16 @@ class DefaultSource extends RelationProvider with Logging {
|
||||||
new IllegalArgumentException("Invalid value for " + TABLE_KEY +" '" + tableName + "'")
|
new IllegalArgumentException("Invalid value for " + TABLE_KEY +" '" + tableName + "'")
|
||||||
|
|
||||||
val schemaMappingString = parameters.getOrElse(SCHEMA_COLUMNS_MAPPING_KEY, "")
|
val schemaMappingString = parameters.getOrElse(SCHEMA_COLUMNS_MAPPING_KEY, "")
|
||||||
val batchingNumStr = parameters.getOrElse(BATCHING_NUM_KEY, "1000")
|
|
||||||
val cachingNumStr = parameters.getOrElse(CACHING_NUM_KEY, "1000")
|
|
||||||
val hbaseConfigResources = parameters.getOrElse(HBASE_CONFIG_RESOURCES_LOCATIONS, "")
|
val hbaseConfigResources = parameters.getOrElse(HBASE_CONFIG_RESOURCES_LOCATIONS, "")
|
||||||
val useHBaseReources = parameters.getOrElse(USE_HBASE_CONTEXT, "true")
|
val useHBaseReources = parameters.getOrElse(USE_HBASE_CONTEXT, "true")
|
||||||
val usePushDownColumnFilter = parameters.getOrElse(PUSH_DOWN_COLUMN_FILTER, "true")
|
val usePushDownColumnFilter = parameters.getOrElse(PUSH_DOWN_COLUMN_FILTER, "true")
|
||||||
|
|
||||||
val batchingNum:Int = try {
|
|
||||||
batchingNumStr.toInt
|
|
||||||
} catch {
|
|
||||||
case e:NumberFormatException => throw
|
|
||||||
new IllegalArgumentException("Invalid value for " + BATCHING_NUM_KEY +" '"
|
|
||||||
+ batchingNumStr + "'", e)
|
|
||||||
}
|
|
||||||
|
|
||||||
val cachingNum:Int = try {
|
|
||||||
cachingNumStr.toInt
|
|
||||||
} catch {
|
|
||||||
case e:NumberFormatException => throw
|
|
||||||
new IllegalArgumentException("Invalid value for " + CACHING_NUM_KEY +" '"
|
|
||||||
+ cachingNumStr + "'", e)
|
|
||||||
}
|
|
||||||
|
|
||||||
new HBaseRelation(tableName.get,
|
new HBaseRelation(tableName.get,
|
||||||
generateSchemaMappingMap(schemaMappingString),
|
generateSchemaMappingMap(schemaMappingString),
|
||||||
batchingNum.toInt,
|
|
||||||
cachingNum.toInt,
|
|
||||||
hbaseConfigResources,
|
hbaseConfigResources,
|
||||||
useHBaseReources.equalsIgnoreCase("true"),
|
useHBaseReources.equalsIgnoreCase("true"),
|
||||||
usePushDownColumnFilter.equalsIgnoreCase("true"))(sqlContext)
|
usePushDownColumnFilter.equalsIgnoreCase("true"),
|
||||||
|
parameters)(sqlContext)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -148,10 +129,6 @@ class DefaultSource extends RelationProvider with Logging {
|
||||||
* @param tableName HBase table that we plan to read from
|
* @param tableName HBase table that we plan to read from
|
||||||
* @param schemaMappingDefinition SchemaMapping information to map HBase
|
* @param schemaMappingDefinition SchemaMapping information to map HBase
|
||||||
* Qualifiers to SparkSQL columns
|
* Qualifiers to SparkSQL columns
|
||||||
* @param batchingNum The batching number to be applied to the
|
|
||||||
* scan object
|
|
||||||
* @param cachingNum The caching number to be applied to the
|
|
||||||
* scan object
|
|
||||||
* @param configResources Optional comma separated list of config resources
|
* @param configResources Optional comma separated list of config resources
|
||||||
* to get based on their URI
|
* to get based on their URI
|
||||||
* @param useHBaseContext If true this will look to see if
|
* @param useHBaseContext If true this will look to see if
|
||||||
|
@ -162,14 +139,26 @@ class DefaultSource extends RelationProvider with Logging {
|
||||||
case class HBaseRelation (val tableName:String,
|
case class HBaseRelation (val tableName:String,
|
||||||
val schemaMappingDefinition:
|
val schemaMappingDefinition:
|
||||||
java.util.HashMap[String, SchemaQualifierDefinition],
|
java.util.HashMap[String, SchemaQualifierDefinition],
|
||||||
val batchingNum:Int,
|
|
||||||
val cachingNum:Int,
|
|
||||||
val configResources:String,
|
val configResources:String,
|
||||||
val useHBaseContext:Boolean,
|
val useHBaseContext:Boolean,
|
||||||
val usePushDownColumnFilter:Boolean) (
|
val usePushDownColumnFilter:Boolean,
|
||||||
|
@transient parameters: Map[String, String] ) (
|
||||||
@transient val sqlContext:SQLContext)
|
@transient val sqlContext:SQLContext)
|
||||||
extends BaseRelation with PrunedFilteredScan with Logging {
|
extends BaseRelation with PrunedFilteredScan with Logging {
|
||||||
|
|
||||||
|
// The user supplied per table parameter will overwrite global ones in SparkConf
|
||||||
|
val blockCacheEnable = parameters.get(HBaseSparkConf.BLOCK_CACHE_ENABLE).map(_.toBoolean)
|
||||||
|
.getOrElse(
|
||||||
|
sqlContext.sparkContext.getConf.getBoolean(
|
||||||
|
HBaseSparkConf.BLOCK_CACHE_ENABLE, HBaseSparkConf.defaultBlockCacheEnable))
|
||||||
|
val cacheSize = parameters.get(HBaseSparkConf.CACHE_SIZE).map(_.toInt)
|
||||||
|
.getOrElse(
|
||||||
|
sqlContext.sparkContext.getConf.getInt(
|
||||||
|
HBaseSparkConf.CACHE_SIZE, HBaseSparkConf.defaultCachingSize))
|
||||||
|
val batchNum = parameters.get(HBaseSparkConf.BATCH_NUM).map(_.toInt)
|
||||||
|
.getOrElse(sqlContext.sparkContext.getConf.getInt(
|
||||||
|
HBaseSparkConf.BATCH_NUM, HBaseSparkConf.defaultBatchNum))
|
||||||
|
|
||||||
//create or get latest HBaseContext
|
//create or get latest HBaseContext
|
||||||
@transient val hbaseContext:HBaseContext = if (useHBaseContext) {
|
@transient val hbaseContext:HBaseContext = if (useHBaseContext) {
|
||||||
LatestHBaseContextCache.latest
|
LatestHBaseContextCache.latest
|
||||||
|
@ -321,8 +310,9 @@ case class HBaseRelation (val tableName:String,
|
||||||
|
|
||||||
if (resultRDD == null) {
|
if (resultRDD == null) {
|
||||||
val scan = new Scan()
|
val scan = new Scan()
|
||||||
scan.setBatch(batchingNum)
|
scan.setCacheBlocks(blockCacheEnable)
|
||||||
scan.setCaching(cachingNum)
|
scan.setBatch(batchNum)
|
||||||
|
scan.setCaching(cacheSize)
|
||||||
requiredQualifierDefinitionList.foreach( d =>
|
requiredQualifierDefinitionList.foreach( d =>
|
||||||
scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
|
scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
|
||||||
|
|
||||||
|
|
|
@ -17,8 +17,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.spark.datasources
|
package org.apache.hadoop.hbase.spark.datasources
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.spark.SparkSQLPushDownFilter
|
|
||||||
import org.apache.spark.Partition
|
|
||||||
import org.apache.hadoop.hbase.spark.hbase._
|
import org.apache.hadoop.hbase.spark.hbase._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.spark.datasources
|
||||||
|
|
||||||
|
object HBaseSparkConf{
|
||||||
|
// This is the hbase configuration. User can either set them in SparkConf, which
|
||||||
|
// will take effect globally, or configure it per table, which will overwrite the value
|
||||||
|
// set in SparkConf. If not setted, the default value will take effect.
|
||||||
|
val BLOCK_CACHE_ENABLE = "spark.hbase.blockcache.enable"
|
||||||
|
// default block cache is set to true by default following hbase convention, but note that
|
||||||
|
// this potentially may slow down the system
|
||||||
|
val defaultBlockCacheEnable = true
|
||||||
|
val CACHE_SIZE = "spark.hbase.cacheSize"
|
||||||
|
val defaultCachingSize = 1000
|
||||||
|
val BATCH_NUM = "spark.hbase.batchNum"
|
||||||
|
val defaultBatchNum = 1000
|
||||||
|
}
|
|
@ -17,17 +17,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.spark.datasources
|
package org.apache.hadoop.hbase.spark.datasources
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.TableName
|
|
||||||
import org.apache.hadoop.hbase.client._
|
import org.apache.hadoop.hbase.client._
|
||||||
import org.apache.hadoop.hbase.filter.Filter
|
|
||||||
import org.apache.hadoop.hbase.spark.{ScanRange, SchemaQualifierDefinition, HBaseRelation, SparkSQLPushDownFilter}
|
import org.apache.hadoop.hbase.spark.{ScanRange, SchemaQualifierDefinition, HBaseRelation, SparkSQLPushDownFilter}
|
||||||
import org.apache.hadoop.hbase.spark.hbase._
|
import org.apache.hadoop.hbase.spark.hbase._
|
||||||
import org.apache.hadoop.hbase.spark.datasources.HBaseResources._
|
import org.apache.hadoop.hbase.spark.datasources.HBaseResources._
|
||||||
import org.apache.hadoop.hbase.util.Bytes
|
import org.apache.spark.{SparkEnv, TaskContext, Logging, Partition}
|
||||||
import org.apache.spark.sql.catalyst.expressions.Row
|
|
||||||
import org.apache.spark.{TaskContext, Logging, Partition}
|
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
|
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
|
@ -37,6 +31,7 @@ class HBaseTableScanRDD(relation: HBaseRelation,
|
||||||
@transient val filter: Option[SparkSQLPushDownFilter] = None,
|
@transient val filter: Option[SparkSQLPushDownFilter] = None,
|
||||||
val columns: Seq[SchemaQualifierDefinition] = Seq.empty
|
val columns: Seq[SchemaQualifierDefinition] = Seq.empty
|
||||||
)extends RDD[Result](relation.sqlContext.sparkContext, Nil) with Logging {
|
)extends RDD[Result](relation.sqlContext.sparkContext, Nil) with Logging {
|
||||||
|
private def sparkConf = SparkEnv.get.conf
|
||||||
var ranges = Seq.empty[Range]
|
var ranges = Seq.empty[Range]
|
||||||
def addRange(r: ScanRange) = {
|
def addRange(r: ScanRange) = {
|
||||||
val lower = if (r.lowerBound != null && r.lowerBound.length > 0) {
|
val lower = if (r.lowerBound != null && r.lowerBound.length > 0) {
|
||||||
|
@ -106,8 +101,9 @@ class HBaseTableScanRDD(relation: HBaseRelation,
|
||||||
scan.addColumn(d.columnFamilyBytes, d.qualifierBytes)
|
scan.addColumn(d.columnFamilyBytes, d.qualifierBytes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
scan.setBatch(relation.batchingNum)
|
scan.setCacheBlocks(relation.blockCacheEnable)
|
||||||
scan.setCaching(relation.cachingNum)
|
scan.setBatch(relation.batchNum)
|
||||||
|
scan.setCaching(relation.cacheSize)
|
||||||
filter.foreach(scan.setFilter(_))
|
filter.foreach(scan.setFilter(_))
|
||||||
scan
|
scan
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.spark.datasources
|
||||||
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
|
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.spark.util.Utils
|
|
||||||
|
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
|
|
||||||
|
|
|
@ -18,10 +18,11 @@
|
||||||
package org.apache.hadoop.hbase.spark
|
package org.apache.hadoop.hbase.spark
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.client.{Put, ConnectionFactory}
|
import org.apache.hadoop.hbase.client.{Put, ConnectionFactory}
|
||||||
|
import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf
|
||||||
import org.apache.hadoop.hbase.util.Bytes
|
import org.apache.hadoop.hbase.util.Bytes
|
||||||
import org.apache.hadoop.hbase.{TableNotFoundException, TableName, HBaseTestingUtility}
|
import org.apache.hadoop.hbase.{TableNotFoundException, TableName, HBaseTestingUtility}
|
||||||
import org.apache.spark.sql.{DataFrame, SQLContext}
|
import org.apache.spark.sql.{DataFrame, SQLContext}
|
||||||
import org.apache.spark.{SparkContext, Logging}
|
import org.apache.spark.{SparkConf, SparkContext, Logging}
|
||||||
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
|
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
|
||||||
|
|
||||||
class DefaultSourceSuite extends FunSuite with
|
class DefaultSourceSuite extends FunSuite with
|
||||||
|
@ -57,8 +58,11 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
|
||||||
logInfo(" - creating table " + t2TableName)
|
logInfo(" - creating table " + t2TableName)
|
||||||
TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
|
TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
|
||||||
logInfo(" - created table")
|
logInfo(" - created table")
|
||||||
|
val sparkConf = new SparkConf
|
||||||
sc = new SparkContext("local", "test")
|
sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true")
|
||||||
|
sparkConf.set(HBaseSparkConf.BATCH_NUM, "100")
|
||||||
|
sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100")
|
||||||
|
sc = new SparkContext("local", "test", sparkConf)
|
||||||
|
|
||||||
val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration)
|
val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration)
|
||||||
try {
|
try {
|
||||||
|
@ -139,18 +143,14 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
|
||||||
df = sqlContext.load("org.apache.hadoop.hbase.spark",
|
df = sqlContext.load("org.apache.hadoop.hbase.spark",
|
||||||
Map("hbase.columns.mapping" ->
|
Map("hbase.columns.mapping" ->
|
||||||
"KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b,",
|
"KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b,",
|
||||||
"hbase.table" -> "t1",
|
"hbase.table" -> "t1"))
|
||||||
"hbase.batching.num" -> "100",
|
|
||||||
"cachingNum" -> "100"))
|
|
||||||
|
|
||||||
df.registerTempTable("hbaseTable1")
|
df.registerTempTable("hbaseTable1")
|
||||||
|
|
||||||
df = sqlContext.load("org.apache.hadoop.hbase.spark",
|
df = sqlContext.load("org.apache.hadoop.hbase.spark",
|
||||||
Map("hbase.columns.mapping" ->
|
Map("hbase.columns.mapping" ->
|
||||||
"KEY_FIELD INT :key, A_FIELD STRING c:a, B_FIELD STRING c:b,",
|
"KEY_FIELD INT :key, A_FIELD STRING c:a, B_FIELD STRING c:b,",
|
||||||
"hbase.table" -> "t2",
|
"hbase.table" -> "t2"))
|
||||||
"hbase.batching.num" -> "100",
|
|
||||||
"cachingNum" -> "100"))
|
|
||||||
|
|
||||||
df.registerTempTable("hbaseTable2")
|
df.registerTempTable("hbaseTable2")
|
||||||
}
|
}
|
||||||
|
@ -635,49 +635,32 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test("Test bad hbase.batching.num type") {
|
test("Test HBaseSparkConf matching") {
|
||||||
intercept[IllegalArgumentException] {
|
val df = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
|
||||||
df = sqlContext.load("org.apache.hadoop.hbase.spark",
|
Map("cacheSize" -> "100",
|
||||||
Map("hbase.columns.mapping" ->
|
"batchNum" -> "100",
|
||||||
"KEY_FIELD FOOBAR :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD STRING c:i,",
|
"blockCacheingEnable" -> "true", "rowNum" -> "10"))
|
||||||
"hbase.table" -> "t1", "hbase.batching.num" -> "foo"))
|
assert(df.count() == 10)
|
||||||
|
|
||||||
df.registerTempTable("hbaseIntWrongTypeTmp")
|
|
||||||
|
|
||||||
val result = sqlContext.sql("SELECT KEY_FIELD, " +
|
|
||||||
"B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
|
|
||||||
|
|
||||||
assert(result.count() == 5)
|
|
||||||
|
|
||||||
val localResult = result.take(5)
|
|
||||||
localResult.length
|
|
||||||
|
|
||||||
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
|
|
||||||
assert(executionRules.dynamicLogicExpression == null)
|
|
||||||
|
|
||||||
|
|
||||||
|
val df1 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
|
||||||
|
Map("cacheSize" -> "1000",
|
||||||
|
"batchNum" -> "100", "blockCacheingEnable" -> "true", "rowNum" -> "10"))
|
||||||
|
intercept[Exception] {
|
||||||
|
assert(df1.count() == 10)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
test("Test bad hbase.caching.num type") {
|
|
||||||
intercept[IllegalArgumentException] {
|
|
||||||
df = sqlContext.load("org.apache.hadoop.hbase.spark",
|
|
||||||
Map("hbase.columns.mapping" ->
|
|
||||||
"KEY_FIELD FOOBAR :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD STRING c:i,",
|
|
||||||
"hbase.table" -> "t1", "hbase.caching.num" -> "foo"))
|
|
||||||
|
|
||||||
df.registerTempTable("hbaseIntWrongTypeTmp")
|
|
||||||
|
|
||||||
val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, " +
|
|
||||||
"I_FIELD FROM hbaseIntWrongTypeTmp")
|
|
||||||
|
|
||||||
val localResult = result.take(10)
|
|
||||||
assert(localResult.length == 5)
|
|
||||||
|
|
||||||
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
|
|
||||||
assert(executionRules.dynamicLogicExpression == null)
|
|
||||||
|
|
||||||
|
val df2 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
|
||||||
|
Map("cacheSize" -> "100",
|
||||||
|
"batchNum" -> "1000", "blockCacheingEnable" -> "true", "rowNum" -> "10"))
|
||||||
|
intercept[Exception] {
|
||||||
|
assert(df2.count() == 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
val df3 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
|
||||||
|
Map("cacheSize" -> "100",
|
||||||
|
"batchNum" -> "100", "blockCacheingEnable" -> "false", "rowNum" -> "10"))
|
||||||
|
intercept[Exception] {
|
||||||
|
assert(df3.count() == 10)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.spark
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf
|
||||||
|
import org.apache.spark.SparkEnv
|
||||||
|
import org.apache.spark.rdd.RDD
|
||||||
|
import org.apache.spark.sql.{Row, SQLContext}
|
||||||
|
import org.apache.spark.sql.sources._
|
||||||
|
import org.apache.spark.sql.types._
|
||||||
|
|
||||||
|
class HBaseTestSource extends RelationProvider {
|
||||||
|
override def createRelation(
|
||||||
|
sqlContext: SQLContext,
|
||||||
|
parameters: Map[String, String]): BaseRelation = {
|
||||||
|
DummyScan(
|
||||||
|
parameters("cacheSize").toInt,
|
||||||
|
parameters("batchNum").toInt,
|
||||||
|
parameters("blockCacheingEnable").toBoolean,
|
||||||
|
parameters("rowNum").toInt)(sqlContext)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case class DummyScan(
|
||||||
|
cacheSize: Int,
|
||||||
|
batchNum: Int,
|
||||||
|
blockCachingEnable: Boolean,
|
||||||
|
rowNum: Int)(@transient val sqlContext: SQLContext)
|
||||||
|
extends BaseRelation with TableScan {
|
||||||
|
private def sparkConf = SparkEnv.get.conf
|
||||||
|
override def schema: StructType =
|
||||||
|
StructType(StructField("i", IntegerType, nullable = false) :: Nil)
|
||||||
|
|
||||||
|
override def buildScan(): RDD[Row] = sqlContext.sparkContext.parallelize(0 until rowNum)
|
||||||
|
.map(Row(_))
|
||||||
|
.map{ x =>
|
||||||
|
if (sparkConf.getInt(HBaseSparkConf.BATCH_NUM,
|
||||||
|
HBaseSparkConf.defaultBatchNum) != batchNum ||
|
||||||
|
sparkConf.getInt(HBaseSparkConf.CACHE_SIZE,
|
||||||
|
HBaseSparkConf.defaultCachingSize) != cacheSize ||
|
||||||
|
sparkConf.getBoolean(HBaseSparkConf.BLOCK_CACHE_ENABLE,
|
||||||
|
HBaseSparkConf.defaultBlockCacheEnable)
|
||||||
|
!= blockCachingEnable) {
|
||||||
|
throw new Exception("HBase Spark configuration cannot be set properly")
|
||||||
|
}
|
||||||
|
x
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue