From 6bb59382263ad13d9ec2f51087a09266edef170c Mon Sep 17 00:00:00 2001 From: Jerry He Date: Sat, 4 Mar 2017 12:53:21 -0800 Subject: [PATCH] HBASE-14375 Define public API for spark integration module --- .../hadoop/hbase/HBaseInterfaceAudience.java | 2 ++ .../hadoop/hbase/spark/SparkSQLPushDownFilter.java | 2 ++ .../hadoop/hbase/spark/BulkLoadPartitioner.scala | 4 ++++ .../hadoop/hbase/spark/ByteArrayComparable.scala | 4 ++++ .../hadoop/hbase/spark/ByteArrayWrapper.scala | 4 ++++ .../spark/ColumnFamilyQualifierMapKeyWrapper.scala | 4 ++++ .../apache/hadoop/hbase/spark/DefaultSource.scala | 14 ++++++++++++-- .../hbase/spark/DynamicLogicExpression.scala | 13 +++++++++++++ .../hbase/spark/FamiliesQualifiersValues.scala | 5 +++++ .../hbase/spark/FamilyHFileWriteOptions.scala | 5 +++++ .../apache/hadoop/hbase/spark/HBaseContext.scala | 4 ++++ .../hadoop/hbase/spark/HBaseDStreamFunctions.scala | 4 ++++ .../hadoop/hbase/spark/HBaseRDDFunctions.scala | 4 ++++ .../hadoop/hbase/spark/JavaHBaseContext.scala | 4 ++++ .../hadoop/hbase/spark/KeyFamilyQualifier.scala | 4 ++++ .../apache/hadoop/hbase/spark/NewHBaseRDD.scala | 4 ++++ .../hadoop/hbase/spark/datasources/Bound.scala | 8 ++++++++ .../hbase/spark/datasources/HBaseResources.scala | 8 ++++++++ .../hbase/spark/datasources/HBaseSparkConf.scala | 5 +++++ .../spark/datasources/HBaseTableScanRDD.scala | 2 ++ .../hbase/spark/datasources/JavaBytesEncoder.scala | 11 +++++++++++ .../hbase/spark/datasources/NaiveEncoder.scala | 2 ++ .../hbase/spark/datasources/SchemaConverters.scala | 8 ++++++-- .../hadoop/hbase/spark/datasources/SerDes.scala | 3 ++- .../datasources/SerializableConfiguration.scala | 2 ++ .../datasources/hbase/DataTypeParserWrapper.scala | 1 + .../sql/datasources/hbase/HBaseTableCatalog.scala | 11 +++++++++++ .../apache/spark/sql/datasources/hbase/Utils.scala | 3 +++ 28 files changed, 140 insertions(+), 5 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java index 2e58913345f..cb42e4861e4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java @@ -35,6 +35,8 @@ public final class HBaseInterfaceAudience { public static final String COPROC = "Coprocesssor"; public static final String REPLICATION = "Replication"; public static final String PHOENIX = "Phoenix"; + public static final String SPARK = "Spark"; + /** * Denotes class names that appear in user facing configuration files. */ diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java index 3e90fe14eef..398e6a2d669 100644 --- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.spark; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.spark.datasources.BytesEncoder; @@ -43,6 +44,7 @@ import com.google.protobuf.ByteString; * by SparkSQL so that we have make the filters at the region server level * and avoid sending the data back to the client to be filtered. */ +@InterfaceAudience.Private public class SparkSQLPushDownFilter extends FilterBase{ protected static final Log log = LogFactory.getLog(SparkSQLPushDownFilter.class); diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala index 39d34037ac2..b11e313f59b 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.spark import java.util import java.util.Comparator +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.Partitioner @@ -29,6 +31,8 @@ import org.apache.spark.Partitioner * * @param startKeys The start keys for the given table */ +@InterfaceAudience.Public +@InterfaceStability.Evolving class BulkLoadPartitioner(startKeys:Array[Array[Byte]]) extends Partitioner { diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala index fce92fb8b91..deef6ba7b1e 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala @@ -19,8 +19,12 @@ package org.apache.hadoop.hbase.spark +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.util.Bytes +@InterfaceAudience.Public +@InterfaceStability.Evolving class ByteArrayComparable(val bytes:Array[Byte], val offset:Int = 0, var length:Int = -1) extends Comparable[ByteArrayComparable] { diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala index 9167f75faec..622da1776ad 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.spark import java.io.Serializable +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.util.Bytes /** @@ -26,6 +28,8 @@ import org.apache.hadoop.hbase.util.Bytes * * @param value The Byte Array value */ +@InterfaceAudience.Public +@InterfaceStability.Evolving class ByteArrayWrapper (var value:Array[Byte]) extends Comparable[ByteArrayWrapper] with Serializable { override def compareTo(valueOther: ByteArrayWrapper): Int = { diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala index f223d1b8bd4..8d0885c9bf8 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala @@ -17,6 +17,8 @@ package org.apache.hadoop.hbase.spark +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.util.Bytes /** @@ -30,6 +32,8 @@ import org.apache.hadoop.hbase.util.Bytes * @param qualifierOffSet Offset of qualifier value in the array * @param qualifierLength Length of the qualifier value with in the array */ +@InterfaceAudience.Public +@InterfaceStability.Evolving class ColumnFamilyQualifierMapKeyWrapper(val columnFamily:Array[Byte], val columnFamilyOffSet:Int, val columnFamilyLength:Int, diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala index b04abd869e3..a8b2ab8db61 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.spark import java.util import java.util.concurrent.ConcurrentLinkedQueue +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat @@ -45,6 +46,9 @@ import scala.collection.mutable * DefaultSource for integration with Spark's dataframe datasources. * This class will produce a relationProvider based on input given to it from spark * + * This class needs to stay in the current package 'org.apache.hadoop.hbase.spark' + * for Spark to match the hbase data source name. + * * In all this DefaultSource support the following datasource functionality * - Scan range pruning through filter push down logic based on rowKeys * - Filter push down logic on HBase Cells @@ -52,6 +56,7 @@ import scala.collection.mutable * - Type conversions of basic SQL types. All conversions will be * Through the HBase Bytes object commands. */ +@InterfaceAudience.Private class DefaultSource extends RelationProvider with CreatableRelationProvider with Logging { /** * Is given input from SparkSQL to construct a BaseRelation @@ -85,6 +90,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider wit * * @param sqlContext SparkSQL context */ +@InterfaceAudience.Private case class HBaseRelation ( @transient parameters: Map[String, String], userSpecifiedSchema: Option[StructType] @@ -569,6 +575,7 @@ case class HBaseRelation ( * @param lowerBound Lower bound of scan * @param isLowerBoundEqualTo Include lower bound value in the results */ +@InterfaceAudience.Private class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean, var lowerBound:Array[Byte], var isLowerBoundEqualTo:Boolean) extends Serializable { @@ -722,6 +729,7 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean, * @param currentPoint the initial point when the filter is created * @param currentRange the initial scanRange when the filter is created */ +@InterfaceAudience.Private class ColumnFilter (currentPoint:Array[Byte] = null, currentRange:ScanRange = null, var points:mutable.MutableList[Array[Byte]] = @@ -851,6 +859,7 @@ class ColumnFilter (currentPoint:Array[Byte] = null, * Also contains merge commends that will consolidate the filters * per column name */ +@InterfaceAudience.Private class ColumnFilterCollection { val columnFilterMap = new mutable.HashMap[String, ColumnFilter] @@ -916,6 +925,7 @@ class ColumnFilterCollection { * Status object to store static functions but also to hold last executed * information that can be used for unit testing. */ +@InterfaceAudience.Private object DefaultSourceStaticUtils { val rawInteger = new RawInteger @@ -1058,6 +1068,7 @@ object DefaultSourceStaticUtils { * @param currentPoint the initial point when the filter is created * @param currentRange the initial scanRange when the filter is created */ +@InterfaceAudience.Private class RowKeyFilter (currentPoint:Array[Byte] = null, currentRange:ScanRange = new ScanRange(null, true, new Array[Byte](0), true), @@ -1208,7 +1219,6 @@ class RowKeyFilter (currentPoint:Array[Byte] = null, } } - - +@InterfaceAudience.Private class ExecutionRuleForUnitTesting(val rowKeyFilter: RowKeyFilter, val dynamicLogicExpression: DynamicLogicExpression) diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala index 1a1d478ebac..7df56fd0f97 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.spark import java.util +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.spark.datasources.{BytesEncoder, JavaBytesEncoder} import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder.JavaBytesEncoder import org.apache.hadoop.hbase.util.Bytes @@ -33,6 +34,7 @@ import org.apache.spark.sql.types._ * A logic tree can be written out as a string and reconstructed from that string * */ +@InterfaceAudience.Private trait DynamicLogicExpression { def execute(columnToCurrentRowValueMap: util.HashMap[String, ByteArrayComparable], valueFromQueryValueArray:Array[Array[Byte]]): Boolean @@ -53,6 +55,7 @@ trait DynamicLogicExpression { } } +@InterfaceAudience.Private trait CompareTrait { self: DynamicLogicExpression => def columnName: String @@ -68,6 +71,7 @@ trait CompareTrait { } } +@InterfaceAudience.Private class AndLogicExpression (val leftExpression:DynamicLogicExpression, val rightExpression:DynamicLogicExpression) extends DynamicLogicExpression{ @@ -87,6 +91,7 @@ class AndLogicExpression (val leftExpression:DynamicLogicExpression, } } +@InterfaceAudience.Private class OrLogicExpression (val leftExpression:DynamicLogicExpression, val rightExpression:DynamicLogicExpression) extends DynamicLogicExpression{ @@ -105,6 +110,7 @@ class OrLogicExpression (val leftExpression:DynamicLogicExpression, } } +@InterfaceAudience.Private class EqualLogicExpression (val columnName:String, val valueFromQueryIndex:Int, val isNot:Boolean) extends DynamicLogicExpression{ @@ -125,6 +131,7 @@ class EqualLogicExpression (val columnName:String, } } +@InterfaceAudience.Private class IsNullLogicExpression (val columnName:String, val isNot:Boolean) extends DynamicLogicExpression{ override def execute(columnToCurrentRowValueMap: @@ -140,6 +147,7 @@ class IsNullLogicExpression (val columnName:String, } } +@InterfaceAudience.Private class GreaterThanLogicExpression (override val columnName:String, override val valueFromQueryIndex:Int) extends DynamicLogicExpression with CompareTrait{ @@ -149,6 +157,7 @@ class GreaterThanLogicExpression (override val columnName:String, } } +@InterfaceAudience.Private class GreaterThanOrEqualLogicExpression (override val columnName:String, override val valueFromQueryIndex:Int) extends DynamicLogicExpression with CompareTrait{ @@ -158,6 +167,7 @@ class GreaterThanOrEqualLogicExpression (override val columnName:String, } } +@InterfaceAudience.Private class LessThanLogicExpression (override val columnName:String, override val valueFromQueryIndex:Int) extends DynamicLogicExpression with CompareTrait { @@ -167,6 +177,7 @@ class LessThanLogicExpression (override val columnName:String, } } +@InterfaceAudience.Private class LessThanOrEqualLogicExpression (val columnName:String, val valueFromQueryIndex:Int) extends DynamicLogicExpression with CompareTrait{ @@ -176,6 +187,7 @@ class LessThanOrEqualLogicExpression (val columnName:String, } } +@InterfaceAudience.Private class PassThroughLogicExpression() extends DynamicLogicExpression { override def execute(columnToCurrentRowValueMap: util.HashMap[String, ByteArrayComparable], @@ -190,6 +202,7 @@ class PassThroughLogicExpression() extends DynamicLogicExpression { } } +@InterfaceAudience.Private object DynamicLogicExpressionBuilder { def build(expressionString: String, encoder: BytesEncoder): DynamicLogicExpression = { diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala index 33b36092dc9..e995260ff9d 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamiliesQualifiersValues.scala @@ -18,10 +18,15 @@ package org.apache.hadoop.hbase.spark import java.util +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + /** * This object is a clean way to store and sort all cells that will be bulk * loaded into a single row */ +@InterfaceAudience.Public +@InterfaceStability.Evolving class FamiliesQualifiersValues extends Serializable { //Tree maps are used because we need the results to // be sorted when we read them diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala index 7dbe140259a..3b517a2db07 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/FamilyHFileWriteOptions.scala @@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.spark import java.io.Serializable +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + /** * This object will hold optional data for how a given column family's * writer will work @@ -29,6 +32,8 @@ import java.io.Serializable * @param dataBlockEncoding String to define the data block encoding to be used * in the HFile */ +@InterfaceAudience.Public +@InterfaceStability.Evolving class FamilyHFileWriteOptions( val compression:String, val bloomType: String, val blockSize: Int, diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index aeffecba222..3fc0ccc71f2 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -22,6 +22,8 @@ import java.util import java.util.UUID import javax.management.openmbean.KeyAlreadyExistsException +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.fs.HFileSystem import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.io.compress.Compression @@ -58,6 +60,8 @@ import scala.collection.mutable * of disseminating the configuration information * to the working and managing the life cycle of Connections. */ +@InterfaceAudience.Public +@InterfaceStability.Evolving class HBaseContext(@transient sc: SparkContext, @transient val config: Configuration, val tmpHdfsConfgFile: String = null) diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala index d563a29fa13..be2db026025 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseDStreamFunctions.scala @@ -17,6 +17,8 @@ package org.apache.hadoop.hbase.spark import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark.streaming.dstream.DStream @@ -27,6 +29,8 @@ import scala.reflect.ClassTag * HBaseDStreamFunctions contains a set of implicit functions that can be * applied to a Spark DStream so that we can easily interact with HBase */ +@InterfaceAudience.Public +@InterfaceStability.Evolving object HBaseDStreamFunctions { /** diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala index 601642adb61..bb4439c9b5e 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.spark import java.util import org.apache.hadoop.hbase.{HConstants, TableName} +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark.rdd.RDD @@ -30,6 +32,8 @@ import scala.reflect.ClassTag * HBaseRDDFunctions contains a set of implicit functions that can be * applied to a Spark RDD so that we can easily interact with HBase */ +@InterfaceAudience.Public +@InterfaceStability.Evolving object HBaseRDDFunctions { diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala index a99e0e3af44..cd5406f58ce 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/JavaHBaseContext.scala @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.spark import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.{Connection, Delete, Get, Put, Result, Scan} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} @@ -36,6 +38,8 @@ import scala.reflect.ClassTag * @param jsc This is the JavaSparkContext that we will wrap * @param config This is the config information to out HBase cluster */ +@InterfaceAudience.Public +@InterfaceStability.Evolving class JavaHBaseContext(@transient jsc: JavaSparkContext, @transient config: Configuration) extends Serializable { val hbaseContext = new HBaseContext(jsc.sc, config) diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala index b2eda2c9bdd..cdead5d0b70 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/KeyFamilyQualifier.scala @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.spark import java.io.Serializable +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.util.Bytes /** @@ -30,6 +32,8 @@ import org.apache.hadoop.hbase.util.Bytes * @param family Record ColumnFamily * @param qualifier Cell Qualifier */ +@InterfaceAudience.Public +@InterfaceStability.Evolving class KeyFamilyQualifier(val rowKey:Array[Byte], val family:Array[Byte], val qualifier:Array[Byte]) extends Comparable[KeyFamilyQualifier] with Serializable { override def compareTo(o: KeyFamilyQualifier): Int = { diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/NewHBaseRDD.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/NewHBaseRDD.scala index 8e5e8f9e98f..f19c590ff5b 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/NewHBaseRDD.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/NewHBaseRDD.scala @@ -18,10 +18,14 @@ package org.apache.hadoop.hbase.spark import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.mapreduce.InputFormat import org.apache.spark.rdd.NewHadoopRDD import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} +@InterfaceAudience.Public +@InterfaceStability.Evolving class NewHBaseRDD[K,V](@transient sc : SparkContext, @transient inputFormatClass: Class[_ <: InputFormat[K, V]], @transient keyClass: Class[K], diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala index 8e03e958d0d..0abcd047db4 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.spark.datasources +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.spark.hbase._ /** @@ -25,9 +26,14 @@ import org.apache.hadoop.hbase.spark.hbase._ * @param b The byte array of the bound * @param inc inclusive or not. */ +@InterfaceAudience.Private case class Bound(b: Array[Byte], inc: Boolean) // The non-overlapping ranges we need to scan, if lower is equal to upper, it is a get request + +@InterfaceAudience.Private case class Range(lower: Option[Bound], upper: Option[Bound]) + +@InterfaceAudience.Private object Range { def apply(region: HBaseRegion): Range = { Range(region.start.map(Bound(_, true)), if (region.end.get.size == 0) { @@ -38,6 +44,7 @@ object Range { } } +@InterfaceAudience.Private object Ranges { // We assume that // 1. r.lower.inc is true, and r.upper.inc is false @@ -87,6 +94,7 @@ object Ranges { } } +@InterfaceAudience.Private object Points { def and(r: Range, ps: Seq[Array[Byte]]): Seq[Array[Byte]] = { ps.flatMap { p => diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala index 7e0a862e6a9..9fcd73e83eb 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.spark.datasources +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.spark.{HBaseConnectionKey, SmartConnection, @@ -28,10 +29,12 @@ import scala.language.implicitConversions // User has to invoke release explicitly to release the resource, // and potentially parent resources +@InterfaceAudience.Private trait Resource { def release(): Unit } +@InterfaceAudience.Private case class ScanResource(tbr: TableResource, rs: ResultScanner) extends Resource { def release() { rs.close() @@ -39,12 +42,14 @@ case class ScanResource(tbr: TableResource, rs: ResultScanner) extends Resource } } +@InterfaceAudience.Private case class GetResource(tbr: TableResource, rs: Array[Result]) extends Resource { def release() { tbr.release() } } +@InterfaceAudience.Private trait ReferencedResource { var count: Int = 0 def init(): Unit @@ -84,6 +89,7 @@ trait ReferencedResource { } } +@InterfaceAudience.Private case class TableResource(relation: HBaseRelation) extends ReferencedResource { var connection: SmartConnection = _ var table: Table = _ @@ -113,6 +119,7 @@ case class TableResource(relation: HBaseRelation) extends ReferencedResource { } } +@InterfaceAudience.Private case class RegionResource(relation: HBaseRelation) extends ReferencedResource { var connection: SmartConnection = _ var rl: RegionLocator = _ @@ -144,6 +151,7 @@ case class RegionResource(relation: HBaseRelation) extends ReferencedResource { } } +@InterfaceAudience.Private object HBaseResources{ implicit def ScanResToScan(sr: ScanResource): ResultScanner = { sr.rs diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala index bdef1a45332..0f20d1d883b 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala @@ -17,6 +17,11 @@ package org.apache.hadoop.hbase.spark.datasources +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Evolving object HBaseSparkConf{ // This is the hbase configuration. User can either set them in SparkConf, which // will take effect globally, or configure it per table, which will overwrite the value diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala index 853ec9bfe8c..dcb5300c98c 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.spark.datasources import java.util.ArrayList +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.spark._ import org.apache.hadoop.hbase.spark.hbase._ @@ -30,6 +31,7 @@ import org.apache.spark.rdd.RDD import scala.collection.mutable +@InterfaceAudience.Private class HBaseTableScanRDD(relation: HBaseRelation, val hbaseContext: HBaseContext, @transient val filter: Option[SparkSQLPushDownFilter] = None, diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala index 851fb66b63f..e9ba8e7eb98 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala @@ -17,6 +17,9 @@ package org.apache.hadoop.hbase.spark.datasources +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder.JavaBytesEncoder import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.Logging @@ -29,6 +32,8 @@ import org.apache.spark.sql.types._ * @param low: the lower bound of the range. * @param upper: the upper bound of the range. */ +@InterfaceAudience.LimitedPrivate(Array(HBaseInterfaceAudience.SPARK)) +@InterfaceStability.Evolving case class BoundRange(low: Array[Byte],upper: Array[Byte]) /** @@ -40,6 +45,8 @@ case class BoundRange(low: Array[Byte],upper: Array[Byte]) * @param greater: the set of ranges for GreaterThan/GreaterThanOrEqualTo * @param value: the byte array of the original value */ +@InterfaceAudience.LimitedPrivate(Array(HBaseInterfaceAudience.SPARK)) +@InterfaceStability.Evolving case class BoundRanges(less: Array[BoundRange], greater: Array[BoundRange], value: Array[Byte]) /** @@ -47,6 +54,8 @@ case class BoundRanges(less: Array[BoundRange], greater: Array[BoundRange], valu * encode is used for serializing the data type to byte array and the filter is * used to filter out the unnecessary records. */ +@InterfaceAudience.LimitedPrivate(Array(HBaseInterfaceAudience.SPARK)) +@InterfaceStability.Evolving trait BytesEncoder { def encode(dt: DataType, value: Any): Array[Byte] @@ -83,6 +92,8 @@ trait BytesEncoder { def ranges(in: Any): Option[BoundRanges] } +@InterfaceAudience.LimitedPrivate(Array(HBaseInterfaceAudience.SPARK)) +@InterfaceStability.Evolving object JavaBytesEncoder extends Enumeration with Logging{ type JavaBytesEncoder = Value val Greater, GreaterEqual, Less, LessEqual, Equal, Unknown = Value diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala index 3137717d688..64868967556 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala @@ -16,6 +16,7 @@ package org.apache.hadoop.hbase.spark.datasources * limitations under the License. */ +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder.JavaBytesEncoder import org.apache.hadoop.hbase.spark.hbase._ import org.apache.hadoop.hbase.util.Bytes @@ -31,6 +32,7 @@ import org.apache.spark.unsafe.types.UTF8String * can work correctly, which is done by wrapping the type into the first byte * of the serialized array. */ +@InterfaceAudience.Private class NaiveEncoder extends BytesEncoder with Logging{ var code = 0 def nextCode: Byte = { diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SchemaConverters.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SchemaConverters.scala index e7acb587ee3..a7c298a6d4a 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SchemaConverters.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SchemaConverters.scala @@ -30,6 +30,7 @@ import org.apache.avro.SchemaBuilder.FieldDefault import org.apache.avro.SchemaBuilder.RecordBuilder import org.apache.avro.io._ import org.apache.commons.io.output.ByteArrayOutputStream +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes import scala.collection.JavaConversions._ @@ -43,8 +44,10 @@ import org.apache.spark.sql.types._ import scala.collection.immutable.Map - +@InterfaceAudience.Private abstract class AvroException(msg: String) extends Exception(msg) + +@InterfaceAudience.Private case class SchemaConversionException(msg: String) extends AvroException(msg) /*** @@ -55,6 +58,7 @@ case class SchemaConversionException(msg: String) extends AvroException(msg) * 3. convertTypeToAvro: This function constructs converter function for a given sparkSQL * datatype. This is used in writing Avro records out to disk */ +@InterfaceAudience.Private object SchemaConverters { case class SchemaType(dataType: DataType, nullable: Boolean) @@ -393,7 +397,7 @@ object SchemaConverters { } } - +@InterfaceAudience.Private object AvroSerdes { // We only handle top level is record or primary type now def serialize(input: Any, schema: Schema): Array[Byte]= { diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerDes.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerDes.scala index a19099b90ff..98cc8719c90 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerDes.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerDes.scala @@ -28,14 +28,15 @@ import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericR import org.apache.avro.io._ import org.apache.commons.io.output.ByteArrayOutputStream import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.sql.types._ +// TODO: This is not really used in code. trait SerDes { def serialize(value: Any): Array[Byte] def deserialize(bytes: Array[Byte], start: Int, end: Int): Any } +// TODO: This is not really used in code. class DoubleSerDes extends SerDes { override def serialize(value: Any): Array[Byte] = Bytes.toBytes(value.asInstanceOf[Double]) override def deserialize(bytes: Array[Byte], start: Int, end: Int): Any = { diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala index 42a5c3218b7..46ae125a4ad 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala @@ -20,9 +20,11 @@ package org.apache.hadoop.hbase.spark.datasources import java.io.{IOException, ObjectInputStream, ObjectOutputStream} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.classification.InterfaceAudience; import scala.util.control.NonFatal +@InterfaceAudience.Private class SerializableConfiguration(@transient var value: Configuration) extends Serializable { private def writeObject(out: ObjectOutputStream): Unit = tryOrIOException { out.defaultWriteObject() diff --git a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala index 1e56a3d2006..3df23f958ee 100644 --- a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala +++ b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.SqlLexical import org.apache.spark.sql.catalyst.util.DataTypeParser import org.apache.spark.sql.types.DataType +// TODO: Only used in test suite. object DataTypeParserWrapper { lazy val dataTypeParser = new DataTypeParser { override val lexical = new SqlLexical diff --git a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala index bb9a94b6dff..6515fe99b89 100644 --- a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala +++ b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.datasources.hbase import org.apache.avro.Schema +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.spark.SchemaConverters import org.apache.hadoop.hbase.spark.datasources._ import org.apache.hadoop.hbase.spark.hbase._ @@ -32,6 +34,7 @@ import scala.collection.mutable // Due the access issue defined in spark, we have to locate the file in this package. // The definition of each column cell, which may be composite type // TODO: add avro support +@InterfaceAudience.Private case class Field( colName: String, cf: String, @@ -112,6 +115,7 @@ case class Field( // The row key definition, with each key refer to the col defined in Field, e.g., // key1:key2:key3 +@InterfaceAudience.Private case class RowKey(k: String) { val keys = k.split(":") var fields: Seq[Field] = _ @@ -127,6 +131,7 @@ case class RowKey(k: String) { } } // The map between the column presented to Spark and the HBase field +@InterfaceAudience.Private case class SchemaMap(map: mutable.HashMap[String, Field]) { def toFields = map.map { case (name, field) => StructField(name, field.dt) @@ -139,6 +144,7 @@ case class SchemaMap(map: mutable.HashMap[String, Field]) { // The definition of HBase and Relation relation schema +@InterfaceAudience.Private case class HBaseTableCatalog( namespace: String, name: String, @@ -203,6 +209,8 @@ case class HBaseTableCatalog( initRowKey } +@InterfaceAudience.Public +@InterfaceStability.Evolving object HBaseTableCatalog { // If defined and larger than 3, a new table will be created with the nubmer of region specified. val newTable = "newtable" @@ -286,6 +294,7 @@ object HBaseTableCatalog { |}""".stripMargin */ @deprecated("Please use new json format to define HBaseCatalog") + // TODO: There is no need to deprecate since this is the first release. def convert(parameters: Map[String, String]): Map[String, String] = { val tableName = parameters.get(TABLE_KEY).getOrElse(null) // if the hbase.table is not defined, we assume it is json format already. @@ -319,6 +328,7 @@ object HBaseTableCatalog { * @param schemaMappingString The schema mapping string from the SparkSQL map * @return A map of definitions keyed by the SparkSQL column name */ + @InterfaceAudience.Private def generateSchemaMappingMap(schemaMappingString:String): java.util.HashMap[String, SchemaQualifierDefinition] = { println(schemaMappingString) @@ -362,6 +372,7 @@ object HBaseTableCatalog { * @param columnFamily HBase column family * @param qualifier HBase qualifier name */ +@InterfaceAudience.Private case class SchemaQualifierDefinition(columnName:String, colType:String, columnFamily:String, diff --git a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/Utils.scala b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/Utils.scala index 73d054d10bf..6f339cd17c7 100644 --- a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/Utils.scala +++ b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/Utils.scala @@ -24,6 +24,9 @@ import org.apache.spark.sql.execution.SparkSqlSerializer import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private object Utils {