From d7ddc79198679d8c642e7d8ad5141ba518f8d9f3 Mon Sep 17 00:00:00 2001 From: Yi Liang Date: Tue, 11 Apr 2017 17:04:40 -0700 Subject: [PATCH] HBASE-17905 [hbase-spark] bulkload does not work when table not exist Signed-off-by: tedyu --- .../hbase/spark/BulkLoadPartitioner.scala | 13 ++++++++----- .../hadoop/hbase/spark/HBaseContext.scala | 18 +++++++++++++++++- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala index ab4fc41f007..022c933d479 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala @@ -33,8 +33,8 @@ import org.apache.spark.Partitioner @InterfaceAudience.Public class BulkLoadPartitioner(startKeys:Array[Array[Byte]]) extends Partitioner { - - override def numPartitions: Int = startKeys.length + // when table not exist, startKeys = Byte[0][] + override def numPartitions: Int = if (startKeys.length == 0) 1 else startKeys.length override def getPartition(key: Any): Int = { @@ -53,8 +53,11 @@ class BulkLoadPartitioner(startKeys:Array[Array[Byte]]) case _ => key.asInstanceOf[Array[Byte]] } - val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator) - if (partition < 0) partition * -1 + -2 - else partition + var partition = util.Arrays.binarySearch(startKeys, rowKey, comparator) + if (partition < 0) + partition = partition * -1 + -2 + if (partition < 0) + partition = 0 + partition } } diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index e2891dba3dc..1948bd39873 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -48,7 +48,7 @@ import org.apache.spark.streaming.dstream.DStream import java.io._ import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod -import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.hadoop.fs.{Path, FileAlreadyExistsException, FileSystem} import scala.collection.mutable /** @@ -620,9 +620,17 @@ class HBaseContext(@transient sc: SparkContext, compactionExclude: Boolean = false, maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE): Unit = { + val stagingPath = new Path(stagingDir) + val fs = stagingPath.getFileSystem(config) + if (fs.exists(stagingPath)) { + throw new FileAlreadyExistsException("Path " + stagingDir + " already exists") + } val conn = HBaseConnectionCache.getConnection(config) val regionLocator = conn.getRegionLocator(tableName) val startKeys = regionLocator.getStartKeys + if (startKeys.length == 0) { + logInfo("Table " + tableName.toString + " was not found") + } val defaultCompressionStr = config.get("hfile.compression", Compression.Algorithm.NONE.getName) val hfileCompression = HFileWriterImpl @@ -743,9 +751,17 @@ class HBaseContext(@transient sc: SparkContext, compactionExclude: Boolean = false, maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE): Unit = { + val stagingPath = new Path(stagingDir) + val fs = stagingPath.getFileSystem(config) + if (fs.exists(stagingPath)) { + throw new FileAlreadyExistsException("Path " + stagingDir + " already exists") + } val conn = HBaseConnectionCache.getConnection(config) val regionLocator = conn.getRegionLocator(tableName) val startKeys = regionLocator.getStartKeys + if (startKeys.length == 0) { + logInfo("Table " + tableName.toString + " was not found") + } val defaultCompressionStr = config.get("hfile.compression", Compression.Algorithm.NONE.getName) val defaultCompression = HFileWriterImpl