From ee3a469641ec3e7385571ab4713d559b75bf50eb Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 9 Mar 2018 07:58:53 -0800 Subject: [PATCH] HBASE-16179 Fix compilation errors when building hbase-spark against Spark 2.0 Signed-off-by: Mike Drob --- hbase-spark-it/pom.xml | 14 ++- .../spark/IntegrationTestSparkBulkLoad.java | 4 +- hbase-spark/pom.xml | 8 +- .../hbase/spark/SparkSQLPushDownFilter.java | 1 + .../hadoop/hbase/spark/DefaultSource.scala | 1 - .../hbase/spark/HBaseConnectionCache.scala | 1 - .../hadoop/hbase/spark/HBaseContext.scala | 4 +- .../hadoop/hbase/spark/JavaHBaseContext.scala | 16 +-- .../apache/hadoop/hbase/spark/Logging.scala | 119 ++++++++++++++++++ .../hadoop/hbase/spark/NewHBaseRDD.scala | 12 +- .../spark/datasources/HBaseTableScanRDD.scala | 5 +- .../spark/datasources/JavaBytesEncoder.scala | 2 +- .../spark/datasources/NaiveEncoder.scala | 2 +- .../hbase/DataTypeParserWrapper.scala | 16 ++- .../datasources/hbase/HBaseTableCatalog.scala | 5 +- .../spark/sql/datasources/hbase/Utils.scala | 6 +- .../hadoop/hbase/spark/BulkLoadSuite.scala | 3 +- .../hbase/spark/DefaultSourceSuite.scala | 12 +- .../spark/DynamicLogicExpressionSuite.scala | 2 +- .../hbase/spark/HBaseCatalogSuite.scala | 2 +- .../spark/HBaseConnectionCacheSuite.scala | 2 +- .../hbase/spark/HBaseContextSuite.scala | 3 +- .../spark/HBaseDStreamFunctionsSuite.scala | 3 +- .../hbase/spark/HBaseRDDFunctionsSuite.scala | 3 +- .../hbase/spark/PartitionFilterSuite.scala | 3 +- 25 files changed, 189 insertions(+), 60 deletions(-) create mode 100644 hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/Logging.scala diff --git a/hbase-spark-it/pom.xml b/hbase-spark-it/pom.xml index c1c39679844..2cac0f98f52 100644 --- a/hbase-spark-it/pom.xml +++ b/hbase-spark-it/pom.xml @@ -33,9 +33,11 @@ - 1.6.0 - 2.10.4 - 2.10 + 2.1.1 + + 2.11.8 + 2.11 **/Test*.java **/IntegrationTest*.java @@ -263,6 +265,12 @@ + + org.scala-lang.modules + scala-xml_2.11 + 1.0.4 + provided + org.apache.spark spark-sql_${scala.binary.version} diff --git a/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java b/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java index 1273a51f076..d13dd17c87f 100644 --- a/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java +++ b/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java @@ -319,7 +319,7 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { PairFlatMapFunction, SparkLinkKey, SparkLinkChain> { @Override - public Iterable> call(Tuple2> call(Tuple2 v) throws Exception { Result value = v._2(); long longRk = Bytes.toLong(value.getRow()); @@ -334,7 +334,7 @@ public class IntegrationTestSparkBulkLoad extends IntegrationTestBase { new Tuple2<>(new SparkLinkKey(chainId, order), new SparkLinkChain(longRk, next)); list.add(tuple2); } - return list; + return list.iterator(); } } diff --git a/hbase-spark/pom.xml b/hbase-spark/pom.xml index a3a41fd79bb..a6a04ad868f 100644 --- a/hbase-spark/pom.xml +++ b/hbase-spark/pom.xml @@ -30,9 +30,11 @@ hbase-spark Apache HBase - Spark - 1.6.0 - 2.10.4 - 2.10 + 2.1.1 + + 2.11.8 + 2.11 ${project.basedir}/.. diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java index e9bb511ad43..694fb6ab8c6 100644 --- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.Filter.ReturnCode; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.spark.datasources.BytesEncoder; import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder; diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala index a488dd333a1..08df63546cf 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala @@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.CellUtil import org.apache.hadoop.mapred.JobConf -import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.datasources.hbase.{Utils, Field, HBaseTableCatalog} import org.apache.spark.sql.{DataFrame, SaveMode, Row, SQLContext} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala index 7eb2090a3b4..8eb4dd921f3 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala @@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory import org.apache.hadoop.hbase.security.{User, UserProvider} import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf import org.apache.hadoop.hbase.{HConstants, TableName} -import org.apache.spark.Logging import scala.collection.mutable diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index d1880576374..0156e70726a 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -39,7 +39,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ import org.apache.hadoop.hbase.client._ import scala.reflect.ClassTag -import org.apache.spark.{Logging, SerializableWritable, SparkContext} +import org.apache.spark.{SerializableWritable, SparkContext} import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil, TableInputFormat, IdentityTableMapper} import org.apache.hadoop.hbase.io.ImmutableBytesWritable @@ -60,7 +60,7 @@ import scala.collection.mutable * to the working and managing the life cycle of Connections. */ @InterfaceAudience.Public -class HBaseContext(@transient sc: SparkContext, +class HBaseContext(@transient val sc: SparkContext, @transient val config: Configuration, val tmpHdfsConfgFile: String = null) extends Serializable with Logging { diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala index fe4b65f66c0..be6581a3c7b 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala @@ -29,6 +29,8 @@ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.api.java.function.{FlatMapFunction, Function, VoidFunction} import org.apache.spark.streaming.api.java.JavaDStream +import java.lang.Iterable + import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -41,8 +43,8 @@ import scala.reflect.ClassTag * @param config This is the config information to out HBase cluster */ @InterfaceAudience.Public -class JavaHBaseContext(@transient jsc: JavaSparkContext, - @transient config: Configuration) extends Serializable { +class JavaHBaseContext(@transient val jsc: JavaSparkContext, + @transient val config: Configuration) extends Serializable { val hbaseContext = new HBaseContext(jsc.sc, config) /** @@ -107,15 +109,9 @@ class JavaHBaseContext(@transient jsc: JavaSparkContext, def mapPartitions[T, R](javaRdd: JavaRDD[T], f: FlatMapFunction[(java.util.Iterator[T], Connection), R]): JavaRDD[R] = { - - def fn = (it: Iterator[T], conn: Connection) => - asScalaIterator( - f.call((asJavaIterator(it), conn)).iterator() - ) - JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd, - (iterator: Iterator[T], connection: Connection) => - fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R]) + (it: Iterator[T], conn: Connection) => + f.call(it, conn))(fakeClassTag[R]))(fakeClassTag[R]) } /** diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/Logging.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/Logging.scala new file mode 100644 index 00000000000..9a67477dade --- /dev/null +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/Logging.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import org.apache.log4j.LogManager +import org.slf4j.{Logger, LoggerFactory} +import org.slf4j.impl.StaticLoggerBinder + +/** + * Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows + * logging messages at different levels using methods that only evaluate parameters lazily if the + * log level is enabled. + * Logging is private in Spark 2.0 + * This is to isolate incompatibilties across Spark releases. + */ +trait Logging { + + // Make the log field transient so that objects with Logging can + // be serialized and used on another machine + @transient private var log_ : Logger = null + + // Method to get the logger name for this object + protected def logName = { + // Ignore trailing $'s in the class names for Scala objects + this.getClass.getName.stripSuffix("$") + } + + // Method to get or create the logger for this object + protected def log: Logger = { + if (log_ == null) { + initializeLogIfNecessary(false) + log_ = LoggerFactory.getLogger(logName) + } + log_ + } + + // Log methods that take only a String + protected def logInfo(msg: => String) { + if (log.isInfoEnabled) log.info(msg) + } + + protected def logDebug(msg: => String) { + if (log.isDebugEnabled) log.debug(msg) + } + + protected def logTrace(msg: => String) { + if (log.isTraceEnabled) log.trace(msg) + } + + protected def logWarning(msg: => String) { + if (log.isWarnEnabled) log.warn(msg) + } + + protected def logError(msg: => String) { + if (log.isErrorEnabled) log.error(msg) + } + + // Log methods that take Throwables (Exceptions/Errors) too + protected def logInfo(msg: => String, throwable: Throwable) { + if (log.isInfoEnabled) log.info(msg, throwable) + } + + protected def logDebug(msg: => String, throwable: Throwable) { + if (log.isDebugEnabled) log.debug(msg, throwable) + } + + protected def logTrace(msg: => String, throwable: Throwable) { + if (log.isTraceEnabled) log.trace(msg, throwable) + } + + protected def logWarning(msg: => String, throwable: Throwable) { + if (log.isWarnEnabled) log.warn(msg, throwable) + } + + protected def logError(msg: => String, throwable: Throwable) { + if (log.isErrorEnabled) log.error(msg, throwable) + } + + protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit = { + if (!Logging.initialized) { + Logging.initLock.synchronized { + if (!Logging.initialized) { + initializeLogging(isInterpreter) + } + } + } + } + + private def initializeLogging(isInterpreter: Boolean): Unit = { + // Don't use a logger in here, as this is itself occurring during initialization of a logger + // If Log4j 1.2 is being used, but is not initialized, load a default properties file + val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr + Logging.initialized = true + + // Force a call into slf4j to initialize it. Avoids this happening from multiple threads + // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html + log + } +} + +private object Logging { + @volatile private var initialized = false + val initLock = new Object() +} diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/NewHBaseRDD.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/NewHBaseRDD.scala index 6d0a2d20f89..7088ce90bf4 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/NewHBaseRDD.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/NewHBaseRDD.scala @@ -24,12 +24,12 @@ import org.apache.spark.rdd.NewHadoopRDD import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} @InterfaceAudience.Public -class NewHBaseRDD[K,V](@transient sc : SparkContext, - @transient inputFormatClass: Class[_ <: InputFormat[K, V]], - @transient keyClass: Class[K], - @transient valueClass: Class[V], - @transient conf: Configuration, - val hBaseContext: HBaseContext) extends NewHadoopRDD(sc,inputFormatClass, keyClass, valueClass, conf) { +class NewHBaseRDD[K,V](@transient val sc : SparkContext, + @transient val inputFormatClass: Class[_ <: InputFormat[K, V]], + @transient val keyClass: Class[K], + @transient val valueClass: Class[V], + @transient private val __conf: Configuration, + val hBaseContext: HBaseContext) extends NewHadoopRDD(sc, inputFormatClass, keyClass, valueClass, __conf) { override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { hBaseContext.applyCreds() diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala index 1ca1b454f14..efeaa7cf71b 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.spark.hbase._ import org.apache.hadoop.hbase.spark.datasources.HBaseResources._ import org.apache.hadoop.hbase.util.ShutdownHookManager import org.apache.spark.sql.datasources.hbase.Field -import org.apache.spark.{SparkEnv, TaskContext, Logging, Partition} +import org.apache.spark.{SparkEnv, TaskContext, Partition} import org.apache.spark.rdd.RDD import scala.collection.mutable @@ -36,7 +36,8 @@ class HBaseTableScanRDD(relation: HBaseRelation, val hbaseContext: HBaseContext, @transient val filter: Option[SparkSQLPushDownFilter] = None, val columns: Seq[Field] = Seq.empty - )extends RDD[Result](relation.sqlContext.sparkContext, Nil) with Logging { + ) extends RDD[Result](relation.sqlContext.sparkContext, Nil) + { private def sparkConf = SparkEnv.get.conf @transient var ranges = Seq.empty[Range] @transient var points = Seq.empty[Array[Byte]] diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala index 6a5018947f7..95d45475bea 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala @@ -18,11 +18,11 @@ package org.apache.hadoop.hbase.spark.datasources import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.spark.Logging import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder.JavaBytesEncoder import org.apache.hadoop.hbase.util.Bytes -import org.apache.spark.Logging import org.apache.spark.sql.types._ /** diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala index 61382426f8b..a2a68281135 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala @@ -18,9 +18,9 @@ package org.apache.hadoop.hbase.spark.datasources import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder.JavaBytesEncoder +import org.apache.hadoop.hbase.spark.Logging import org.apache.hadoop.hbase.spark.hbase._ import org.apache.hadoop.hbase.util.Bytes -import org.apache.spark.Logging import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String diff --git a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala index 3df23f958ee..37ee34694fb 100644 --- a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala +++ b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala @@ -17,15 +17,13 @@ package org.apache.spark.sql.datasources.hbase -import org.apache.spark.sql.catalyst.SqlLexical -import org.apache.spark.sql.catalyst.util.DataTypeParser +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.types.DataType -// TODO: Only used in test suite. -object DataTypeParserWrapper { - lazy val dataTypeParser = new DataTypeParser { - override val lexical = new SqlLexical - } - - def parse(dataTypeString: String): DataType = dataTypeParser.toDataType(dataTypeString) +trait DataTypeParser { + def parse(dataTypeString: String): DataType +} + +object DataTypeParserWrapper extends DataTypeParser{ + def parse(dataTypeString: String): DataType = CatalystSqlParser.parseDataType(dataTypeString) } diff --git a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala index 65a3bc70e29..6cb0b187062 100644 --- a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala +++ b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala @@ -19,12 +19,11 @@ package org.apache.spark.sql.datasources.hbase import org.apache.avro.Schema import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.spark.Logging import org.apache.hadoop.hbase.spark.SchemaConverters import org.apache.hadoop.hbase.spark.datasources._ import org.apache.hadoop.hbase.spark.hbase._ import org.apache.hadoop.hbase.util.Bytes -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.util.DataTypeParser import org.apache.spark.sql.types._ import org.json4s.jackson.JsonMethods._ @@ -79,7 +78,7 @@ case class Field( } val dt = { - sType.map(DataTypeParser.parse(_)).getOrElse{ + sType.map(DataTypeParserWrapper.parse(_)).getOrElse{ schema.map{ x=> SchemaConverters.toSqlType(x).dataType }.get diff --git a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/Utils.scala b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/Utils.scala index 36b8bbf68e6..5db5965f614 100644 --- a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/Utils.scala +++ b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/Utils.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.datasources.hbase import org.apache.hadoop.hbase.spark.AvroSerdes import org.apache.hadoop.hbase.util.Bytes -import org.apache.spark.sql.execution.SparkSqlSerializer +//import org.apache.spark.sql.execution.SparkSqlSerializer import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -60,8 +60,8 @@ object Utils { val newArray = new Array[Byte](length) System.arraycopy(src, offset, newArray, 0, length) newArray - // TODO: add more data type support - case _ => SparkSqlSerializer.deserialize[Any](src) + // TODO: SparkSqlSerializer.deserialize[Any](src) + case _ => throw new Exception(s"unsupported data type ${f.dt}") } } } diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala index a42732788d5..71dbc2ed0b3 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala @@ -20,11 +20,12 @@ package org.apache.hadoop.hbase.spark import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hbase.client.{Get, ConnectionFactory} import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFile} +import org.apache.hadoop.hbase.spark.Logging import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles import org.apache.hadoop.hbase.{HConstants, CellUtil, HBaseTestingUtility, TableName} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.SparkContext import org.junit.rules.TemporaryFolder import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala index 3bce0415633..3669e3b4401 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala @@ -20,13 +20,14 @@ package org.apache.hadoop.hbase.spark import org.apache.avro.Schema import org.apache.avro.generic.GenericData import org.apache.hadoop.hbase.client.{ConnectionFactory, Put} +import org.apache.hadoop.hbase.spark.Logging import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HBaseTestingUtility, TableName} import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.{Logging, SparkConf, SparkContext} +import org.apache.spark.{SparkConf, SparkContext} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} case class HBaseRecord( @@ -377,8 +378,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { assert(results.length == 2) - assert(executionRules.dynamicLogicExpression.toExpressionString. - equals("( KEY_FIELD <= 0 AND KEY_FIELD >= 1 )")) + val expr = executionRules.dynamicLogicExpression.toExpressionString + assert(expr.equals("( ( KEY_FIELD isNotNull AND KEY_FIELD <= 0 ) AND KEY_FIELD >= 1 )"), expr) assert(executionRules.rowKeyFilter.points.size == 0) assert(executionRules.rowKeyFilter.ranges.size == 1) @@ -653,8 +654,9 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { assert(localResult(0).getInt(2) == 8) val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll() - assert(executionRules.dynamicLogicExpression.toExpressionString. - equals("( I_FIELD > 0 AND I_FIELD < 1 )")) + val expr = executionRules.dynamicLogicExpression.toExpressionString + logInfo(expr) + assert(expr.equals("( ( I_FIELD isNotNull AND I_FIELD > 0 ) AND I_FIELD < 1 )"), expr) } diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala index bc833e8e603..709d76c8619 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala @@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.spark import java.util +import org.apache.hadoop.hbase.spark.Logging import org.apache.hadoop.hbase.spark.datasources.{HBaseSparkConf, JavaBytesEncoder} import org.apache.hadoop.hbase.util.Bytes -import org.apache.spark.Logging import org.apache.spark.sql.types._ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala index 49e2f6c340e..f9e24c8bd3f 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala @@ -17,9 +17,9 @@ package org.apache.hadoop.hbase.spark +import org.apache.hadoop.hbase.spark.Logging import org.apache.hadoop.hbase.spark.datasources.{DoubleSerDes, SerDes} import org.apache.hadoop.hbase.util.Bytes -import org.apache.spark.Logging import org.apache.spark.sql.datasources.hbase.{DataTypeParserWrapper, HBaseTableCatalog} import org.apache.spark.sql.types._ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala index b3fdd4edfbf..7d94cd2acc5 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala @@ -22,9 +22,9 @@ import scala.util.Random import org.apache.hadoop.hbase.client.{BufferedMutator, Table, RegionLocator, Connection, BufferedMutatorParams, Admin, TableBuilder} +import org.apache.hadoop.hbase.spark.Logging import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.TableName -import org.apache.spark.Logging import org.scalatest.FunSuite case class HBaseConnectionKeyMocker (confId: Int) extends HBaseConnectionKey (null) { diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala index 1e1e52dd4a2..6e6312eeb35 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseContextSuite.scala @@ -20,7 +20,8 @@ import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{ CellUtil, TableName, HBaseTestingUtility} -import org.apache.spark.{SparkException, Logging, SparkContext} +import org.apache.hadoop.hbase.spark.Logging +import org.apache.spark.{SparkException, SparkContext} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} class HBaseContextSuite extends FunSuite with diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctionsSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctionsSuite.scala index e6767aedeeb..2ad640df04d 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctionsSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctionsSuite.scala @@ -17,11 +17,12 @@ package org.apache.hadoop.hbase.spark import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.spark.Logging import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseTestingUtility} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Milliseconds, StreamingContext} -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.SparkContext import org.apache.hadoop.hbase.spark.HBaseDStreamFunctions._ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala index 89148c39a5d..17921c35a28 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctionsSuite.scala @@ -17,10 +17,11 @@ package org.apache.hadoop.hbase.spark import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.spark.Logging import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseTestingUtility} import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ -import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.SparkContext import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} import scala.collection.mutable diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala index f47a319b13d..d6458532005 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala @@ -18,10 +18,11 @@ package org.apache.hadoop.hbase.spark import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf +import org.apache.hadoop.hbase.spark.Logging import org.apache.hadoop.hbase.{TableName, HBaseTestingUtility} import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.apache.spark.{SparkConf, SparkContext} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} case class FilterRangeRecord(