HBASE-17905: [hbase-spark] bulkload does not work when table not exist

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
Yi Liang 2017-04-11 15:30:13 -07:00 committed by tedyu
parent 0b5bd78d6e
commit 22f602cab5
2 changed files with 25 additions and 6 deletions

View File

@ -33,8 +33,8 @@ import org.apache.spark.Partitioner
@InterfaceAudience.Public @InterfaceAudience.Public
class BulkLoadPartitioner(startKeys:Array[Array[Byte]]) class BulkLoadPartitioner(startKeys:Array[Array[Byte]])
extends Partitioner { extends Partitioner {
// when table not exist, startKeys = Byte[0][]
override def numPartitions: Int = startKeys.length override def numPartitions: Int = if (startKeys.length == 0) 1 else startKeys.length
override def getPartition(key: Any): Int = { override def getPartition(key: Any): Int = {
@ -53,8 +53,11 @@ class BulkLoadPartitioner(startKeys:Array[Array[Byte]])
case _ => case _ =>
key.asInstanceOf[Array[Byte]] key.asInstanceOf[Array[Byte]]
} }
val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator) var partition = util.Arrays.binarySearch(startKeys, rowKey, comparator)
if (partition < 0) partition * -1 + -2 if (partition < 0)
else partition partition = partition * -1 + -2
if (partition < 0)
partition = 0
partition
} }
} }

View File

@ -48,7 +48,7 @@ import org.apache.spark.streaming.dstream.DStream
import java.io._ import java.io._
import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.fs.{Path, FileAlreadyExistsException, FileSystem}
import scala.collection.mutable import scala.collection.mutable
/** /**
@ -620,9 +620,17 @@ class HBaseContext(@transient sc: SparkContext,
compactionExclude: Boolean = false, compactionExclude: Boolean = false,
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE): maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
Unit = { Unit = {
val stagingPath = new Path(stagingDir)
val fs = stagingPath.getFileSystem(config)
if (fs.exists(stagingPath)) {
throw new FileAlreadyExistsException("Path " + stagingDir + " already exist")
}
val conn = HBaseConnectionCache.getConnection(config) val conn = HBaseConnectionCache.getConnection(config)
val regionLocator = conn.getRegionLocator(tableName) val regionLocator = conn.getRegionLocator(tableName)
val startKeys = regionLocator.getStartKeys val startKeys = regionLocator.getStartKeys
if (startKeys.length == 0) {
logInfo("Table " + tableName.toString + " was not found")
}
val defaultCompressionStr = config.get("hfile.compression", val defaultCompressionStr = config.get("hfile.compression",
Compression.Algorithm.NONE.getName) Compression.Algorithm.NONE.getName)
val hfileCompression = HFileWriterImpl val hfileCompression = HFileWriterImpl
@ -743,9 +751,17 @@ class HBaseContext(@transient sc: SparkContext,
compactionExclude: Boolean = false, compactionExclude: Boolean = false,
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE): maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
Unit = { Unit = {
val stagingPath = new Path(stagingDir)
val fs = stagingPath.getFileSystem(config)
if (fs.exists(stagingPath)) {
throw new FileAlreadyExistsException("Path " + stagingDir + " already exist")
}
val conn = HBaseConnectionCache.getConnection(config) val conn = HBaseConnectionCache.getConnection(config)
val regionLocator = conn.getRegionLocator(tableName) val regionLocator = conn.getRegionLocator(tableName)
val startKeys = regionLocator.getStartKeys val startKeys = regionLocator.getStartKeys
if (startKeys.length == 0) {
logInfo("Table " + tableName.toString + " was not found")
}
val defaultCompressionStr = config.get("hfile.compression", val defaultCompressionStr = config.get("hfile.compression",
Compression.Algorithm.NONE.getName) Compression.Algorithm.NONE.getName)
val defaultCompression = HFileWriterImpl val defaultCompression = HFileWriterImpl