HBASE-15597 Clean up configuration keys used in hbase-spark module (Yi Liang)
This commit is contained in:
parent
fee67bcf14
commit
35d7a0cd07
|
@ -97,36 +97,36 @@ case class HBaseRelation (
|
|||
)(@transient val sqlContext: SQLContext)
|
||||
extends BaseRelation with PrunedFilteredScan with InsertableRelation with Logging {
|
||||
val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong)
|
||||
val minTimestamp = parameters.get(HBaseSparkConf.MIN_TIMESTAMP).map(_.toLong)
|
||||
val maxTimestamp = parameters.get(HBaseSparkConf.MAX_TIMESTAMP).map(_.toLong)
|
||||
val minTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_START).map(_.toLong)
|
||||
val maxTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_END).map(_.toLong)
|
||||
val maxVersions = parameters.get(HBaseSparkConf.MAX_VERSIONS).map(_.toInt)
|
||||
val encoderClsName = parameters.get(HBaseSparkConf.ENCODER).getOrElse(HBaseSparkConf.defaultEncoder)
|
||||
val encoderClsName = parameters.get(HBaseSparkConf.QUERY_ENCODER).getOrElse(HBaseSparkConf.DEFAULT_QUERY_ENCODER)
|
||||
|
||||
@transient val encoder = JavaBytesEncoder.create(encoderClsName)
|
||||
|
||||
val catalog = HBaseTableCatalog(parameters)
|
||||
def tableName = catalog.name
|
||||
val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_RESOURCES_LOCATIONS, "")
|
||||
val useHBaseContext = parameters.get(HBaseSparkConf.USE_HBASE_CONTEXT).map(_.toBoolean).getOrElse(true)
|
||||
val usePushDownColumnFilter = parameters.get(HBaseSparkConf.PUSH_DOWN_COLUMN_FILTER)
|
||||
.map(_.toBoolean).getOrElse(true)
|
||||
val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_LOCATION, "")
|
||||
val useHBaseContext = parameters.get(HBaseSparkConf.USE_HBASECONTEXT).map(_.toBoolean).getOrElse(HBaseSparkConf.DEFAULT_USE_HBASECONTEXT)
|
||||
val usePushDownColumnFilter = parameters.get(HBaseSparkConf.PUSHDOWN_COLUMN_FILTER)
|
||||
.map(_.toBoolean).getOrElse(HBaseSparkConf.DEFAULT_PUSHDOWN_COLUMN_FILTER)
|
||||
|
||||
// The user supplied per table parameter will overwrite global ones in SparkConf
|
||||
val blockCacheEnable = parameters.get(HBaseSparkConf.BLOCK_CACHE_ENABLE).map(_.toBoolean)
|
||||
val blockCacheEnable = parameters.get(HBaseSparkConf.QUERY_CACHEBLOCKS).map(_.toBoolean)
|
||||
.getOrElse(
|
||||
sqlContext.sparkContext.getConf.getBoolean(
|
||||
HBaseSparkConf.BLOCK_CACHE_ENABLE, HBaseSparkConf.defaultBlockCacheEnable))
|
||||
val cacheSize = parameters.get(HBaseSparkConf.CACHE_SIZE).map(_.toInt)
|
||||
HBaseSparkConf.QUERY_CACHEBLOCKS, HBaseSparkConf.DEFAULT_QUERY_CACHEBLOCKS))
|
||||
val cacheSize = parameters.get(HBaseSparkConf.QUERY_CACHEDROWS).map(_.toInt)
|
||||
.getOrElse(
|
||||
sqlContext.sparkContext.getConf.getInt(
|
||||
HBaseSparkConf.CACHE_SIZE, HBaseSparkConf.defaultCachingSize))
|
||||
val batchNum = parameters.get(HBaseSparkConf.BATCH_NUM).map(_.toInt)
|
||||
HBaseSparkConf.QUERY_CACHEDROWS, -1))
|
||||
val batchNum = parameters.get(HBaseSparkConf.QUERY_BATCHSIZE).map(_.toInt)
|
||||
.getOrElse(sqlContext.sparkContext.getConf.getInt(
|
||||
HBaseSparkConf.BATCH_NUM, HBaseSparkConf.defaultBatchNum))
|
||||
HBaseSparkConf.QUERY_BATCHSIZE, -1))
|
||||
|
||||
val bulkGetSize = parameters.get(HBaseSparkConf.BULKGET_SIZE).map(_.toInt)
|
||||
.getOrElse(sqlContext.sparkContext.getConf.getInt(
|
||||
HBaseSparkConf.BULKGET_SIZE, HBaseSparkConf.defaultBulkGetSize))
|
||||
HBaseSparkConf.BULKGET_SIZE, HBaseSparkConf.DEFAULT_BULKGET_SIZE))
|
||||
|
||||
//create or get latest HBaseContext
|
||||
val hbaseContext:HBaseContext = if (useHBaseContext) {
|
||||
|
|
|
@ -37,7 +37,7 @@ private[spark] object HBaseConnectionCache extends Logging {
|
|||
val cacheStat = HBaseConnectionCacheStat(0, 0, 0)
|
||||
|
||||
// in milliseconds
|
||||
private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.connectionCloseDelay
|
||||
private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.DEFAULT_CONNECTION_CLOSE_DELAY
|
||||
private var timeout = DEFAULT_TIME_OUT
|
||||
private var closed: Boolean = false
|
||||
|
||||
|
|
|
@ -20,35 +20,45 @@ package org.apache.hadoop.hbase.spark.datasources
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* This is the hbase configuration. User can either set them in SparkConf, which
|
||||
* will take effect globally, or configure it per table, which will overwrite the value
|
||||
* set in SparkConf. If not set, the default value will take effect.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
object HBaseSparkConf{
|
||||
// This is the hbase configuration. User can either set them in SparkConf, which
|
||||
// will take effect globally, or configure it per table, which will overwrite the value
|
||||
// set in SparkConf. If not setted, the default value will take effect.
|
||||
val BLOCK_CACHE_ENABLE = "spark.hbase.blockcache.enable"
|
||||
// default block cache is set to true by default following hbase convention, but note that
|
||||
// this potentially may slow down the system
|
||||
val defaultBlockCacheEnable = true
|
||||
val CACHE_SIZE = "spark.hbase.cacheSize"
|
||||
val defaultCachingSize = 1000
|
||||
val BATCH_NUM = "spark.hbase.batchNum"
|
||||
val defaultBatchNum = 1000
|
||||
val BULKGET_SIZE = "spark.hbase.bulkGetSize"
|
||||
val defaultBulkGetSize = 1000
|
||||
|
||||
val HBASE_CONFIG_RESOURCES_LOCATIONS = "hbase.config.resources"
|
||||
val USE_HBASE_CONTEXT = "hbase.use.hbase.context"
|
||||
val PUSH_DOWN_COLUMN_FILTER = "hbase.pushdown.column.filter"
|
||||
val defaultPushDownColumnFilter = true
|
||||
|
||||
/** Set to false to disable server-side caching of blocks for this scan,
|
||||
* false by default, since full table scans generate too much BC churn.
|
||||
*/
|
||||
val QUERY_CACHEBLOCKS = "hbase.spark.query.cacheblocks"
|
||||
val DEFAULT_QUERY_CACHEBLOCKS = false
|
||||
/** The number of rows for caching that will be passed to scan. */
|
||||
val QUERY_CACHEDROWS = "hbase.spark.query.cachedrows"
|
||||
/** Set the maximum number of values to return for each call to next() in scan. */
|
||||
val QUERY_BATCHSIZE = "hbase.spark.query.batchsize"
|
||||
/** The number of BulkGets send to HBase. */
|
||||
val BULKGET_SIZE = "hbase.spark.bulkget.size"
|
||||
val DEFAULT_BULKGET_SIZE = 1000
|
||||
/** Set to specify the location of hbase configuration file. */
|
||||
val HBASE_CONFIG_LOCATION = "hbase.spark.config.location"
|
||||
/** Set to specify whether create or use latest cached HBaseContext*/
|
||||
val USE_HBASECONTEXT = "hbase.spark.use.hbasecontext"
|
||||
val DEFAULT_USE_HBASECONTEXT = true
|
||||
/** Pushdown the filter to data source engine to increase the performance of queries. */
|
||||
val PUSHDOWN_COLUMN_FILTER = "hbase.spark.pushdown.columnfilter"
|
||||
val DEFAULT_PUSHDOWN_COLUMN_FILTER= true
|
||||
/** Class name of the encoder, which encode data types from Spark to HBase bytes. */
|
||||
val QUERY_ENCODER = "hbase.spark.query.encoder"
|
||||
val DEFAULT_QUERY_ENCODER = classOf[NaiveEncoder].getCanonicalName
|
||||
/** The timestamp used to filter columns with a specific timestamp. */
|
||||
val TIMESTAMP = "hbase.spark.query.timestamp"
|
||||
val MIN_TIMESTAMP = "hbase.spark.query.minTimestamp"
|
||||
val MAX_TIMESTAMP = "hbase.spark.query.maxTimestamp"
|
||||
/** The starting timestamp used to filter columns with a specific range of versions. */
|
||||
val TIMERANGE_START = "hbase.spark.query.timerange.start"
|
||||
/** The ending timestamp used to filter columns with a specific range of versions. */
|
||||
val TIMERANGE_END = "hbase.spark.query.timerange.end"
|
||||
/** The maximum number of version to return. */
|
||||
val MAX_VERSIONS = "hbase.spark.query.maxVersions"
|
||||
val ENCODER = "hbase.spark.query.encoder"
|
||||
val defaultEncoder = classOf[NaiveEncoder].getCanonicalName
|
||||
|
||||
// in milliseconds
|
||||
val connectionCloseDelay = 10 * 60 * 1000
|
||||
/** Delayed time to close hbase-spark connection when no reference to this connection, in milliseconds. */
|
||||
val DEFAULT_CONNECTION_CLOSE_DELAY = 10 * 60 * 1000
|
||||
}
|
||||
|
|
|
@ -116,9 +116,9 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
|
|||
TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
|
||||
logInfo(" - created table")
|
||||
val sparkConf = new SparkConf
|
||||
sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true")
|
||||
sparkConf.set(HBaseSparkConf.BATCH_NUM, "100")
|
||||
sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100")
|
||||
sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
|
||||
sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
|
||||
sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100")
|
||||
|
||||
sc = new SparkContext("local", "test", sparkConf)
|
||||
|
||||
|
@ -791,7 +791,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
|
|||
|}""".stripMargin
|
||||
df = sqlContext.load("org.apache.hadoop.hbase.spark",
|
||||
Map(HBaseTableCatalog.tableCatalog->catalog,
|
||||
HBaseSparkConf.PUSH_DOWN_COLUMN_FILTER -> "false"))
|
||||
HBaseSparkConf.PUSHDOWN_COLUMN_FILTER -> "false"))
|
||||
|
||||
df.registerTempTable("hbaseNoPushDownTmp")
|
||||
|
||||
|
@ -913,8 +913,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
|
|||
|
||||
// Test Getting old stuff -- Full Scan, TimeRange
|
||||
val oldRange = sqlContext.read
|
||||
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
|
||||
HBaseSparkConf.MAX_TIMESTAMP -> (oldMs + 100).toString))
|
||||
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0",
|
||||
HBaseSparkConf.TIMERANGE_END -> (oldMs + 100).toString))
|
||||
.format("org.apache.hadoop.hbase.spark")
|
||||
.load()
|
||||
assert(oldRange.count() == 101)
|
||||
|
@ -924,8 +924,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
|
|||
|
||||
// Test Getting middle stuff -- Full Scan, TimeRange
|
||||
val middleRange = sqlContext.read
|
||||
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
|
||||
HBaseSparkConf.MAX_TIMESTAMP -> (startMs + 100).toString))
|
||||
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0",
|
||||
HBaseSparkConf.TIMERANGE_END -> (startMs + 100).toString))
|
||||
.format("org.apache.hadoop.hbase.spark")
|
||||
.load()
|
||||
assert(middleRange.count() == 256)
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
|
|||
class DynamicLogicExpressionSuite extends FunSuite with
|
||||
BeforeAndAfterEach with BeforeAndAfterAll with Logging {
|
||||
|
||||
val encoder = JavaBytesEncoder.create(HBaseSparkConf.defaultEncoder)
|
||||
val encoder = JavaBytesEncoder.create(HBaseSparkConf.DEFAULT_QUERY_ENCODER)
|
||||
|
||||
test("Basic And Test") {
|
||||
val leftLogic = new LessThanLogicExpression("Col1", 0)
|
||||
|
|
|
@ -49,13 +49,12 @@ case class DummyScan(
|
|||
override def buildScan(): RDD[Row] = sqlContext.sparkContext.parallelize(0 until rowNum)
|
||||
.map(Row(_))
|
||||
.map{ x =>
|
||||
if (sparkConf.getInt(HBaseSparkConf.BATCH_NUM,
|
||||
HBaseSparkConf.defaultBatchNum) != batchNum ||
|
||||
sparkConf.getInt(HBaseSparkConf.CACHE_SIZE,
|
||||
HBaseSparkConf.defaultCachingSize) != cacheSize ||
|
||||
sparkConf.getBoolean(HBaseSparkConf.BLOCK_CACHE_ENABLE,
|
||||
HBaseSparkConf.defaultBlockCacheEnable)
|
||||
!= blockCachingEnable) {
|
||||
if (sparkConf.getInt(HBaseSparkConf.QUERY_BATCHSIZE,
|
||||
-1) != batchNum ||
|
||||
sparkConf.getInt(HBaseSparkConf.QUERY_CACHEDROWS,
|
||||
-1) != cacheSize ||
|
||||
sparkConf.getBoolean(HBaseSparkConf.QUERY_CACHEBLOCKS,
|
||||
false) != blockCachingEnable) {
|
||||
throw new Exception("HBase Spark configuration cannot be set properly")
|
||||
}
|
||||
x
|
||||
|
|
|
@ -69,9 +69,9 @@ class PartitionFilterSuite extends FunSuite with
|
|||
|
||||
TEST_UTIL.startMiniCluster
|
||||
val sparkConf = new SparkConf
|
||||
sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true")
|
||||
sparkConf.set(HBaseSparkConf.BATCH_NUM, "100")
|
||||
sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100")
|
||||
sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
|
||||
sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
|
||||
sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100")
|
||||
|
||||
sc = new SparkContext("local", "test", sparkConf)
|
||||
new HBaseContext(sc, TEST_UTIL.getConfiguration)
|
||||
|
|
Loading…
Reference in New Issue