From e75e26e3c6cfe7fd378081839d60fc711c1e095f Mon Sep 17 00:00:00 2001 From: tedyu Date: Sat, 19 Dec 2015 15:14:58 -0800 Subject: [PATCH] HBASE-14849 Add option to set block cache to false on SparkSQL executions (Zhan Zhang) --- .../hadoop/hbase/spark/DefaultSource.scala | 56 ++++++------- .../hbase/spark/datasources/Bound.scala | 2 - .../spark/datasources/HBaseSparkConf.scala | 32 ++++++++ .../spark/datasources/HBaseTableScanRDD.scala | 14 ++-- .../SerializableConfiguration.scala | 1 - .../hbase/spark/DefaultSourceSuite.scala | 79 ++++++++----------- .../hadoop/hbase/spark/HBaseTestSource.scala | 63 +++++++++++++++ 7 files changed, 154 insertions(+), 93 deletions(-) create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala create mode 100644 hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala index 664cf57b0ad..73cab3c70fa 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala @@ -21,7 +21,9 @@ import java.util import java.util.concurrent.ConcurrentLinkedQueue import org.apache.hadoop.hbase.client._ -import org.apache.hadoop.hbase.spark.datasources.{HBaseTableScanRDD, HBaseRegion, SerializableConfiguration} +import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf +import org.apache.hadoop.hbase.spark.datasources.HBaseTableScanRDD +import org.apache.hadoop.hbase.spark.datasources.SerializableConfiguration import org.apache.hadoop.hbase.types._ import org.apache.hadoop.hbase.util.{Bytes, PositionedByteRange, SimplePositionedMutableByteRange} import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} @@ -49,8 +51,6 @@ class DefaultSource extends RelationProvider with Logging { val TABLE_KEY:String = "hbase.table" val SCHEMA_COLUMNS_MAPPING_KEY:String = "hbase.columns.mapping" - val BATCHING_NUM_KEY:String = "hbase.batching.num" - val CACHING_NUM_KEY:String = "hbase.caching.num" val HBASE_CONFIG_RESOURCES_LOCATIONS:String = "hbase.config.resources" val USE_HBASE_CONTEXT:String = "hbase.use.hbase.context" val PUSH_DOWN_COLUMN_FILTER:String = "hbase.push.down.column.filter" @@ -71,35 +71,16 @@ class DefaultSource extends RelationProvider with Logging { new IllegalArgumentException("Invalid value for " + TABLE_KEY +" '" + tableName + "'") val schemaMappingString = parameters.getOrElse(SCHEMA_COLUMNS_MAPPING_KEY, "") - val batchingNumStr = parameters.getOrElse(BATCHING_NUM_KEY, "1000") - val cachingNumStr = parameters.getOrElse(CACHING_NUM_KEY, "1000") val hbaseConfigResources = parameters.getOrElse(HBASE_CONFIG_RESOURCES_LOCATIONS, "") val useHBaseReources = parameters.getOrElse(USE_HBASE_CONTEXT, "true") val usePushDownColumnFilter = parameters.getOrElse(PUSH_DOWN_COLUMN_FILTER, "true") - val batchingNum:Int = try { - batchingNumStr.toInt - } catch { - case e:NumberFormatException => throw - new IllegalArgumentException("Invalid value for " + BATCHING_NUM_KEY +" '" - + batchingNumStr + "'", e) - } - - val cachingNum:Int = try { - cachingNumStr.toInt - } catch { - case e:NumberFormatException => throw - new IllegalArgumentException("Invalid value for " + CACHING_NUM_KEY +" '" - + cachingNumStr + "'", e) - } - new HBaseRelation(tableName.get, generateSchemaMappingMap(schemaMappingString), - batchingNum.toInt, - cachingNum.toInt, hbaseConfigResources, useHBaseReources.equalsIgnoreCase("true"), - usePushDownColumnFilter.equalsIgnoreCase("true"))(sqlContext) + usePushDownColumnFilter.equalsIgnoreCase("true"), + parameters)(sqlContext) } /** @@ -148,10 +129,6 @@ class DefaultSource extends RelationProvider with Logging { * @param tableName HBase table that we plan to read from * @param schemaMappingDefinition SchemaMapping information to map HBase * Qualifiers to SparkSQL columns - * @param batchingNum The batching number to be applied to the - * scan object - * @param cachingNum The caching number to be applied to the - * scan object * @param configResources Optional comma separated list of config resources * to get based on their URI * @param useHBaseContext If true this will look to see if @@ -162,14 +139,26 @@ class DefaultSource extends RelationProvider with Logging { case class HBaseRelation (val tableName:String, val schemaMappingDefinition: java.util.HashMap[String, SchemaQualifierDefinition], - val batchingNum:Int, - val cachingNum:Int, val configResources:String, val useHBaseContext:Boolean, - val usePushDownColumnFilter:Boolean) ( + val usePushDownColumnFilter:Boolean, + @transient parameters: Map[String, String] ) ( @transient val sqlContext:SQLContext) extends BaseRelation with PrunedFilteredScan with Logging { + // The user supplied per table parameter will overwrite global ones in SparkConf + val blockCacheEnable = parameters.get(HBaseSparkConf.BLOCK_CACHE_ENABLE).map(_.toBoolean) + .getOrElse( + sqlContext.sparkContext.getConf.getBoolean( + HBaseSparkConf.BLOCK_CACHE_ENABLE, HBaseSparkConf.defaultBlockCacheEnable)) + val cacheSize = parameters.get(HBaseSparkConf.CACHE_SIZE).map(_.toInt) + .getOrElse( + sqlContext.sparkContext.getConf.getInt( + HBaseSparkConf.CACHE_SIZE, HBaseSparkConf.defaultCachingSize)) + val batchNum = parameters.get(HBaseSparkConf.BATCH_NUM).map(_.toInt) + .getOrElse(sqlContext.sparkContext.getConf.getInt( + HBaseSparkConf.BATCH_NUM, HBaseSparkConf.defaultBatchNum)) + //create or get latest HBaseContext @transient val hbaseContext:HBaseContext = if (useHBaseContext) { LatestHBaseContextCache.latest @@ -321,8 +310,9 @@ case class HBaseRelation (val tableName:String, if (resultRDD == null) { val scan = new Scan() - scan.setBatch(batchingNum) - scan.setCaching(cachingNum) + scan.setCacheBlocks(blockCacheEnable) + scan.setBatch(batchNum) + scan.setCaching(cacheSize) requiredQualifierDefinitionList.foreach( d => scan.addColumn(d.columnFamilyBytes, d.qualifierBytes)) diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala index c869f3187ec..0f6098d8a49 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala @@ -17,8 +17,6 @@ package org.apache.hadoop.hbase.spark.datasources -import org.apache.hadoop.hbase.spark.SparkSQLPushDownFilter -import org.apache.spark.Partition import org.apache.hadoop.hbase.spark.hbase._ /** diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala new file mode 100644 index 00000000000..67580b0d027 --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark.datasources + +object HBaseSparkConf{ + // This is the hbase configuration. User can either set them in SparkConf, which + // will take effect globally, or configure it per table, which will overwrite the value + // set in SparkConf. If not setted, the default value will take effect. + val BLOCK_CACHE_ENABLE = "spark.hbase.blockcache.enable" + // default block cache is set to true by default following hbase convention, but note that + // this potentially may slow down the system + val defaultBlockCacheEnable = true + val CACHE_SIZE = "spark.hbase.cacheSize" + val defaultCachingSize = 1000 + val BATCH_NUM = "spark.hbase.batchNum" + val defaultBatchNum = 1000 +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala index 958a9868bc8..eb9d39a7177 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala @@ -17,17 +17,11 @@ package org.apache.hadoop.hbase.spark.datasources -import java.util.concurrent.atomic.AtomicInteger - -import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.client._ -import org.apache.hadoop.hbase.filter.Filter import org.apache.hadoop.hbase.spark.{ScanRange, SchemaQualifierDefinition, HBaseRelation, SparkSQLPushDownFilter} import org.apache.hadoop.hbase.spark.hbase._ import org.apache.hadoop.hbase.spark.datasources.HBaseResources._ -import org.apache.hadoop.hbase.util.Bytes -import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.{TaskContext, Logging, Partition} +import org.apache.spark.{SparkEnv, TaskContext, Logging, Partition} import org.apache.spark.rdd.RDD import scala.collection.mutable @@ -37,6 +31,7 @@ class HBaseTableScanRDD(relation: HBaseRelation, @transient val filter: Option[SparkSQLPushDownFilter] = None, val columns: Seq[SchemaQualifierDefinition] = Seq.empty )extends RDD[Result](relation.sqlContext.sparkContext, Nil) with Logging { + private def sparkConf = SparkEnv.get.conf var ranges = Seq.empty[Range] def addRange(r: ScanRange) = { val lower = if (r.lowerBound != null && r.lowerBound.length > 0) { @@ -106,8 +101,9 @@ class HBaseTableScanRDD(relation: HBaseRelation, scan.addColumn(d.columnFamilyBytes, d.qualifierBytes) } } - scan.setBatch(relation.batchingNum) - scan.setCaching(relation.cachingNum) + scan.setCacheBlocks(relation.blockCacheEnable) + scan.setBatch(relation.batchNum) + scan.setCaching(relation.cacheSize) filter.foreach(scan.setFilter(_)) scan } diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala index 2452a525671..42a5c3218b7 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.spark.datasources import java.io.{IOException, ObjectInputStream, ObjectOutputStream} import org.apache.hadoop.conf.Configuration -import org.apache.spark.util.Utils import scala.util.control.NonFatal diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala index 2cee3a821c5..30ddfc499c0 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala @@ -18,10 +18,11 @@ package org.apache.hadoop.hbase.spark import org.apache.hadoop.hbase.client.{Put, ConnectionFactory} +import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{TableNotFoundException, TableName, HBaseTestingUtility} import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.{SparkConf, SparkContext, Logging} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} class DefaultSourceSuite extends FunSuite with @@ -57,8 +58,11 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { logInfo(" - creating table " + t2TableName) TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily)) logInfo(" - created table") - - sc = new SparkContext("local", "test") + val sparkConf = new SparkConf + sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true") + sparkConf.set(HBaseSparkConf.BATCH_NUM, "100") + sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100") + sc = new SparkContext("local", "test", sparkConf) val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration) try { @@ -139,18 +143,14 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { df = sqlContext.load("org.apache.hadoop.hbase.spark", Map("hbase.columns.mapping" -> "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b,", - "hbase.table" -> "t1", - "hbase.batching.num" -> "100", - "cachingNum" -> "100")) + "hbase.table" -> "t1")) df.registerTempTable("hbaseTable1") df = sqlContext.load("org.apache.hadoop.hbase.spark", Map("hbase.columns.mapping" -> "KEY_FIELD INT :key, A_FIELD STRING c:a, B_FIELD STRING c:b,", - "hbase.table" -> "t2", - "hbase.batching.num" -> "100", - "cachingNum" -> "100")) + "hbase.table" -> "t2")) df.registerTempTable("hbaseTable2") } @@ -635,49 +635,32 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { } } - test("Test bad hbase.batching.num type") { - intercept[IllegalArgumentException] { - df = sqlContext.load("org.apache.hadoop.hbase.spark", - Map("hbase.columns.mapping" -> - "KEY_FIELD FOOBAR :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD STRING c:i,", - "hbase.table" -> "t1", "hbase.batching.num" -> "foo")) - - df.registerTempTable("hbaseIntWrongTypeTmp") - - val result = sqlContext.sql("SELECT KEY_FIELD, " + - "B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp") - - assert(result.count() == 5) - - val localResult = result.take(5) - localResult.length - - val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll() - assert(executionRules.dynamicLogicExpression == null) - + test("Test HBaseSparkConf matching") { + val df = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource", + Map("cacheSize" -> "100", + "batchNum" -> "100", + "blockCacheingEnable" -> "true", "rowNum" -> "10")) + assert(df.count() == 10) + val df1 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource", + Map("cacheSize" -> "1000", + "batchNum" -> "100", "blockCacheingEnable" -> "true", "rowNum" -> "10")) + intercept[Exception] { + assert(df1.count() == 10) } - } - - test("Test bad hbase.caching.num type") { - intercept[IllegalArgumentException] { - df = sqlContext.load("org.apache.hadoop.hbase.spark", - Map("hbase.columns.mapping" -> - "KEY_FIELD FOOBAR :key, A_FIELD STRING c:a, B_FIELD STRING c:b, I_FIELD STRING c:i,", - "hbase.table" -> "t1", "hbase.caching.num" -> "foo")) - - df.registerTempTable("hbaseIntWrongTypeTmp") - - val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, " + - "I_FIELD FROM hbaseIntWrongTypeTmp") - - val localResult = result.take(10) - assert(localResult.length == 5) - - val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll() - assert(executionRules.dynamicLogicExpression == null) + val df2 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource", + Map("cacheSize" -> "100", + "batchNum" -> "1000", "blockCacheingEnable" -> "true", "rowNum" -> "10")) + intercept[Exception] { + assert(df2.count() == 10) + } + val df3 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource", + Map("cacheSize" -> "100", + "batchNum" -> "100", "blockCacheingEnable" -> "false", "rowNum" -> "10")) + intercept[Exception] { + assert(df3.count() == 10) } } diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala new file mode 100644 index 00000000000..83465d91ad3 --- /dev/null +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf +import org.apache.spark.SparkEnv +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +class HBaseTestSource extends RelationProvider { + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + DummyScan( + parameters("cacheSize").toInt, + parameters("batchNum").toInt, + parameters("blockCacheingEnable").toBoolean, + parameters("rowNum").toInt)(sqlContext) + } +} + +case class DummyScan( + cacheSize: Int, + batchNum: Int, + blockCachingEnable: Boolean, + rowNum: Int)(@transient val sqlContext: SQLContext) + extends BaseRelation with TableScan { + private def sparkConf = SparkEnv.get.conf + override def schema: StructType = + StructType(StructField("i", IntegerType, nullable = false) :: Nil) + + override def buildScan(): RDD[Row] = sqlContext.sparkContext.parallelize(0 until rowNum) + .map(Row(_)) + .map{ x => + if (sparkConf.getInt(HBaseSparkConf.BATCH_NUM, + HBaseSparkConf.defaultBatchNum) != batchNum || + sparkConf.getInt(HBaseSparkConf.CACHE_SIZE, + HBaseSparkConf.defaultCachingSize) != cacheSize || + sparkConf.getBoolean(HBaseSparkConf.BLOCK_CACHE_ENABLE, + HBaseSparkConf.defaultBlockCacheEnable) + != blockCachingEnable) { + throw new Exception("HBase Spark configuration cannot be set properly") + } + x + } +}