diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala index 022c933d479..ab4fc41f007 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala @@ -33,8 +33,8 @@ import org.apache.spark.Partitioner @InterfaceAudience.Public class BulkLoadPartitioner(startKeys:Array[Array[Byte]]) extends Partitioner { - // when table not exist, startKeys = Byte[0][] - override def numPartitions: Int = if (startKeys.length == 0) 1 else startKeys.length + + override def numPartitions: Int = startKeys.length override def getPartition(key: Any): Int = { @@ -53,11 +53,8 @@ class BulkLoadPartitioner(startKeys:Array[Array[Byte]]) case _ => key.asInstanceOf[Array[Byte]] } - var partition = util.Arrays.binarySearch(startKeys, rowKey, comparator) - if (partition < 0) - partition = partition * -1 + -2 - if (partition < 0) - partition = 0 - partition + val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator) + if (partition < 0) partition * -1 + -2 + else partition } } diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index 8c4e0f46a39..e2891dba3dc 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -48,7 +48,7 @@ import org.apache.spark.streaming.dstream.DStream import java.io._ import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod -import org.apache.hadoop.fs.{Path, FileAlreadyExistsException, FileSystem} +import org.apache.hadoop.fs.{Path, FileSystem} import scala.collection.mutable /** @@ -620,17 +620,9 @@ class HBaseContext(@transient sc: SparkContext, compactionExclude: Boolean = false, maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE): Unit = { - val stagingPath = new Path(stagingDir) - val fs = stagingPath.getFileSystem(config) - if (fs.exists(stagingPath)) { - throw new FileAlreadyExistsException("Path " + stagingDir + " already exist") - } val conn = HBaseConnectionCache.getConnection(config) val regionLocator = conn.getRegionLocator(tableName) val startKeys = regionLocator.getStartKeys - if (startKeys.length == 0) { - logInfo("Table " + tableName.toString + " was not found") - } val defaultCompressionStr = config.get("hfile.compression", Compression.Algorithm.NONE.getName) val hfileCompression = HFileWriterImpl @@ -751,17 +743,9 @@ class HBaseContext(@transient sc: SparkContext, compactionExclude: Boolean = false, maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE): Unit = { - val stagingPath = new Path(stagingDir) - val fs = stagingPath.getFileSystem(config) - if (fs.exists(stagingPath)) { - throw new FileAlreadyExistsException("Path " + stagingDir + " already exist") - } val conn = HBaseConnectionCache.getConnection(config) val regionLocator = conn.getRegionLocator(tableName) val startKeys = regionLocator.getStartKeys - if (startKeys.length == 0) { - logInfo("Table " + tableName.toString + " was not found") - } val defaultCompressionStr = config.get("hfile.compression", Compression.Algorithm.NONE.getName) val defaultCompression = HFileWriterImpl