HBASE-14150 Add BulkLoad functionality to HBase-Spark Module (Ted Malaska)
This commit is contained in:
parent
aa3538f802
commit
72a48a1333
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.spark
|
||||
|
||||
import java.util
|
||||
import java.util.Comparator
|
||||
|
||||
import org.apache.hadoop.hbase.util.Bytes
|
||||
import org.apache.spark.Partitioner
|
||||
|
||||
/**
|
||||
* A Partitioner implementation that will separate records to different
|
||||
* HBase Regions based on region splits
|
||||
*
|
||||
* @param startKeys The start keys for the given table
|
||||
*/
|
||||
class BulkLoadPartitioner(startKeys:Array[Array[Byte]])
|
||||
extends Partitioner {
|
||||
|
||||
override def numPartitions: Int = startKeys.length
|
||||
|
||||
override def getPartition(key: Any): Int = {
|
||||
|
||||
val rowKey:Array[Byte] =
|
||||
key match {
|
||||
case qualifier: KeyFamilyQualifier =>
|
||||
qualifier.rowKey
|
||||
case _ =>
|
||||
key.asInstanceOf[Array[Byte]]
|
||||
}
|
||||
|
||||
val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] {
|
||||
override def compare(o1: Array[Byte], o2: Array[Byte]): Int = {
|
||||
Bytes.compareTo(o1, o2)
|
||||
}
|
||||
}
|
||||
val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator)
|
||||
if (partition < 0) partition * -1 + -2
|
||||
else partition
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.spark
|
||||
|
||||
import java.io.Serializable
|
||||
|
||||
/**
|
||||
* This object will hold optional data for how a given column family's
|
||||
* writer will work
|
||||
*
|
||||
* @param compression String to define the Compression to be used in the HFile
|
||||
* @param bloomType String to define the bloom type to be used in the HFile
|
||||
* @param blockSize The block size to be used in the HFile
|
||||
* @param dataBlockEncoding String to define the data block encoding to be used
|
||||
* in the HFile
|
||||
*/
|
||||
class FamilyHFileWriteOptions( val compression:String,
|
||||
val bloomType: String,
|
||||
val blockSize: Int,
|
||||
val dataBlockEncoding: String) extends Serializable
|
|
@ -17,31 +17,35 @@
|
|||
|
||||
package org.apache.hadoop.hbase.spark
|
||||
|
||||
import org.apache.hadoop.hbase.TableName
|
||||
import java.net.InetSocketAddress
|
||||
import java.util
|
||||
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem
|
||||
import org.apache.hadoop.hbase._
|
||||
import org.apache.hadoop.hbase.io.compress.Compression
|
||||
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
|
||||
import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFileContextBuilder, HFileWriterImpl}
|
||||
import org.apache.hadoop.hbase.regionserver.{HStore, StoreFile, BloomType}
|
||||
import org.apache.hadoop.hbase.util.Bytes
|
||||
import org.apache.spark.broadcast.Broadcast
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory
|
||||
import org.apache.hadoop.hbase.client.Scan
|
||||
import org.apache.hadoop.hbase.client.Get
|
||||
import org.apache.hadoop.hbase.client.Result
|
||||
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
|
||||
import org.apache.hadoop.hbase.client._
|
||||
import scala.reflect.ClassTag
|
||||
import org.apache.hadoop.hbase.client.Connection
|
||||
import org.apache.hadoop.hbase.client.Put
|
||||
import org.apache.hadoop.hbase.client.Delete
|
||||
import org.apache.spark.{Logging, SerializableWritable, SparkContext}
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
|
||||
import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil,
|
||||
TableInputFormat, IdentityTableMapper}
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
|
||||
import org.apache.hadoop.mapreduce.Job
|
||||
import org.apache.hadoop.hbase.client.Mutation
|
||||
import org.apache.spark.streaming.dstream.DStream
|
||||
import java.io._
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
|
||||
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
|
||||
import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
|
||||
import org.apache.hadoop.fs.{Path, FileSystem}
|
||||
import scala.collection.mutable
|
||||
|
||||
/**
|
||||
* HBaseContext is a façade for HBase operations
|
||||
|
@ -567,4 +571,270 @@ class HBaseContext(@transient sc: SparkContext,
|
|||
*/
|
||||
private[spark]
|
||||
def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
|
||||
|
||||
/**
|
||||
* A Spark Implementation of HBase Bulk load
|
||||
*
|
||||
* This will take the content from an existing RDD then sort and shuffle
|
||||
* it with respect to region splits. The result of that sort and shuffle
|
||||
* will be written to HFiles.
|
||||
*
|
||||
* After this function is executed the user will have to call
|
||||
* LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
|
||||
*
|
||||
* Also note this version of bulk load is different from past versions in
|
||||
* that it includes the qualifier as part of the sort process. The
|
||||
* reason for this is to be able to support rows will very large number
|
||||
* of columns.
|
||||
*
|
||||
* @param rdd The RDD we are bulk loading from
|
||||
* @param tableName The HBase table we are loading into
|
||||
* @param flatMap A flapMap function that will make every
|
||||
* row in the RDD
|
||||
* into N cells for the bulk load
|
||||
* @param stagingDir The location on the FileSystem to bulk load into
|
||||
* @param familyHFileWriteOptionsMap Options that will define how the HFile for a
|
||||
* column family is written
|
||||
* @param compactionExclude Compaction excluded for the HFiles
|
||||
* @param maxSize Max size for the HFiles before they roll
|
||||
* @tparam T The Type of values in the original RDD
|
||||
*/
|
||||
def bulkLoad[T](rdd:RDD[T],
|
||||
tableName: TableName,
|
||||
flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
|
||||
stagingDir:String,
|
||||
familyHFileWriteOptionsMap:
|
||||
util.Map[Array[Byte], FamilyHFileWriteOptions] =
|
||||
new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
|
||||
compactionExclude: Boolean = false,
|
||||
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
|
||||
Unit = {
|
||||
val conn = ConnectionFactory.createConnection(config)
|
||||
val regionLocator = conn.getRegionLocator(tableName)
|
||||
val startKeys = regionLocator.getStartKeys
|
||||
val defaultCompressionStr = config.get("hfile.compression",
|
||||
Compression.Algorithm.NONE.getName)
|
||||
val defaultCompression = HFileWriterImpl
|
||||
.compressionByName(defaultCompressionStr)
|
||||
val now = System.currentTimeMillis()
|
||||
val tableNameByteArray = tableName.getName
|
||||
|
||||
val familyHFileWriteOptionsMapInternal =
|
||||
new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
|
||||
|
||||
val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
|
||||
|
||||
while (entrySetIt.hasNext) {
|
||||
val entry = entrySetIt.next()
|
||||
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
|
||||
}
|
||||
|
||||
/**
|
||||
* This will return a new HFile writer when requested
|
||||
*
|
||||
* @param family column family
|
||||
* @param conf configuration to connect to HBase
|
||||
* @param favoredNodes nodes that we would like to write too
|
||||
* @param fs FileSystem object where we will be writing the HFiles to
|
||||
* @return WriterLength object
|
||||
*/
|
||||
def getNewWriter(family: Array[Byte], conf: Configuration,
|
||||
favoredNodes: Array[InetSocketAddress],
|
||||
fs:FileSystem,
|
||||
familydir:Path): WriterLength = {
|
||||
|
||||
|
||||
var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family))
|
||||
|
||||
if (familyOptions == null) {
|
||||
familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString,
|
||||
BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString)
|
||||
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions)
|
||||
}
|
||||
|
||||
val tempConf = new Configuration(conf)
|
||||
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
|
||||
val contextBuilder = new HFileContextBuilder()
|
||||
.withCompression(Algorithm.valueOf(familyOptions.compression))
|
||||
.withChecksumType(HStore.getChecksumType(conf))
|
||||
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
|
||||
.withBlockSize(familyOptions.blockSize)
|
||||
contextBuilder.withDataBlockEncoding(DataBlockEncoding.
|
||||
valueOf(familyOptions.dataBlockEncoding))
|
||||
val hFileContext = contextBuilder.build()
|
||||
|
||||
if (null == favoredNodes) {
|
||||
new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
|
||||
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
|
||||
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build())
|
||||
} else {
|
||||
new WriterLength(0,
|
||||
new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
|
||||
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
|
||||
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
|
||||
.withFavoredNodes(favoredNodes).build())
|
||||
}
|
||||
}
|
||||
|
||||
val regionSplitPartitioner =
|
||||
new BulkLoadPartitioner(startKeys)
|
||||
|
||||
//This is where all the magic happens
|
||||
//Here we are going to do the following things
|
||||
// 1. FlapMap every row in the RDD into key column value tuples
|
||||
// 2. Then we are going to repartition sort and shuffle
|
||||
// 3. Finally we are going to write out our HFiles
|
||||
rdd.flatMap( r => flatMap(r)).
|
||||
repartitionAndSortWithinPartitions(regionSplitPartitioner).
|
||||
hbaseForeachPartition(this, (it, conn) => {
|
||||
|
||||
val conf = broadcastedConf.value.value
|
||||
val fs = FileSystem.get(conf)
|
||||
val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
|
||||
var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
|
||||
var rollOverRequested = false
|
||||
|
||||
/**
|
||||
* This will roll all writers
|
||||
*/
|
||||
def rollWriters(): Unit = {
|
||||
writerMap.values.foreach( wl => {
|
||||
if (wl.writer != null) {
|
||||
logDebug("Writer=" + wl.writer.getPath +
|
||||
(if (wl.written == 0) "" else ", wrote=" + wl.written))
|
||||
close(wl.writer)
|
||||
}
|
||||
})
|
||||
writerMap.clear()
|
||||
rollOverRequested = false
|
||||
}
|
||||
|
||||
/**
|
||||
* This function will close a given HFile writer
|
||||
* @param w The writer to close
|
||||
*/
|
||||
def close(w:StoreFile.Writer): Unit = {
|
||||
if (w != null) {
|
||||
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
|
||||
Bytes.toBytes(System.currentTimeMillis()))
|
||||
w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
|
||||
Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow)))
|
||||
w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
|
||||
Bytes.toBytes(true))
|
||||
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
|
||||
Bytes.toBytes(compactionExclude))
|
||||
w.appendTrackedTimestampsToMetadata()
|
||||
w.close()
|
||||
}
|
||||
}
|
||||
|
||||
//Here is where we finally iterate through the data in this partition of the
|
||||
//RDD that has been sorted and partitioned
|
||||
it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
|
||||
|
||||
//This will get a writer for the column family
|
||||
//If there is no writer for a given column family then
|
||||
//it will get created here.
|
||||
val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(keyFamilyQualifier.family), {
|
||||
|
||||
val familyDir = new Path(stagingDir, Bytes.toString(keyFamilyQualifier.family))
|
||||
|
||||
fs.mkdirs(familyDir)
|
||||
|
||||
val loc:HRegionLocation = {
|
||||
try {
|
||||
val locator =
|
||||
conn.getRegionLocator(TableName.valueOf(tableNameByteArray))
|
||||
locator.getRegionLocation(keyFamilyQualifier.rowKey)
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
logWarning("there's something wrong when locating rowkey: " +
|
||||
Bytes.toString(keyFamilyQualifier.rowKey))
|
||||
null
|
||||
}
|
||||
}
|
||||
if (null == loc) {
|
||||
if (log.isTraceEnabled) {
|
||||
logTrace("failed to get region location, so use default writer: " +
|
||||
Bytes.toString(keyFamilyQualifier.rowKey))
|
||||
}
|
||||
getNewWriter(family = keyFamilyQualifier.family, conf = conf, favoredNodes = null,
|
||||
fs = fs, familydir = familyDir)
|
||||
} else {
|
||||
if (log.isDebugEnabled) {
|
||||
logDebug("first rowkey: [" + Bytes.toString(keyFamilyQualifier.rowKey) + "]")
|
||||
}
|
||||
val initialIsa =
|
||||
new InetSocketAddress(loc.getHostname, loc.getPort)
|
||||
if (initialIsa.isUnresolved) {
|
||||
if (log.isTraceEnabled) {
|
||||
logTrace("failed to resolve bind address: " + loc.getHostname + ":"
|
||||
+ loc.getPort + ", so use default writer")
|
||||
}
|
||||
getNewWriter(keyFamilyQualifier.family, conf, null, fs, familyDir)
|
||||
} else {
|
||||
if(log.isDebugEnabled) {
|
||||
logDebug("use favored nodes writer: " + initialIsa.getHostString)
|
||||
}
|
||||
getNewWriter(keyFamilyQualifier.family, conf,
|
||||
Array[InetSocketAddress](initialIsa), fs, familyDir)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
val keyValue =new KeyValue(keyFamilyQualifier.rowKey,
|
||||
keyFamilyQualifier.family,
|
||||
keyFamilyQualifier.qualifier,
|
||||
now,cellValue)
|
||||
|
||||
wl.writer.append(keyValue)
|
||||
wl.written += keyValue.getLength
|
||||
|
||||
rollOverRequested = rollOverRequested || wl.written > maxSize
|
||||
|
||||
//This will only roll if we have at least one column family file that is
|
||||
//bigger then maxSize and we have finished a given row key
|
||||
if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
|
||||
rollWriters()
|
||||
}
|
||||
|
||||
previousRow = keyFamilyQualifier.rowKey
|
||||
}
|
||||
//We have finished all the data so lets close up the writers
|
||||
rollWriters()
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* This is a wrapper class around StoreFile.Writer. The reason for the
|
||||
* wrapper is to keep the length of the file along side the writer
|
||||
*
|
||||
* @param written The writer to be wrapped
|
||||
* @param writer The number of bytes written to the writer
|
||||
*/
|
||||
class WriterLength(var written:Long, val writer:StoreFile.Writer)
|
||||
|
||||
/**
|
||||
* This is a wrapper over a byte array so it can work as
|
||||
* a key in a hashMap
|
||||
*
|
||||
* @param o1 The Byte Array value
|
||||
*/
|
||||
class ByteArrayWrapper (val o1:Array[Byte])
|
||||
extends Comparable[ByteArrayWrapper] with Serializable {
|
||||
override def compareTo(o2: ByteArrayWrapper): Int = {
|
||||
Bytes.compareTo(o1,o2.o1)
|
||||
}
|
||||
override def equals(o2: Any): Boolean = {
|
||||
o2 match {
|
||||
case wrapper: ByteArrayWrapper =>
|
||||
Bytes.equals(o1, wrapper.o1)
|
||||
case _ =>
|
||||
false
|
||||
}
|
||||
}
|
||||
override def hashCode():Int = {
|
||||
Bytes.hashCode(o1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,11 +17,15 @@
|
|||
|
||||
package org.apache.hadoop.hbase.spark
|
||||
|
||||
import org.apache.hadoop.hbase.TableName
|
||||
import java.util
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.hbase.{HConstants, TableName}
|
||||
import org.apache.hadoop.hbase.client._
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
|
||||
import org.apache.spark.rdd.RDD
|
||||
|
||||
import scala.collection.immutable.HashMap
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
/**
|
||||
|
@ -158,5 +162,46 @@ object HBaseRDDFunctions
|
|||
RDD[R] = {
|
||||
hc.mapPartitions[T,R](rdd, f)
|
||||
}
|
||||
|
||||
/**
|
||||
* Implicit method that gives easy access to HBaseContext's
|
||||
* bulkLoad method.
|
||||
*
|
||||
* A Spark Implementation of HBase Bulk load
|
||||
*
|
||||
* This will take the content from an existing RDD then sort and shuffle
|
||||
* it with respect to region splits. The result of that sort and shuffle
|
||||
* will be written to HFiles.
|
||||
*
|
||||
* After this function is executed the user will have to call
|
||||
* LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
|
||||
*
|
||||
* Also note this version of bulk load is different from past versions in
|
||||
* that it includes the qualifier as part of the sort process. The
|
||||
* reason for this is to be able to support rows will very large number
|
||||
* of columns.
|
||||
*
|
||||
* @param tableName The HBase table we are loading into
|
||||
* @param flatMap A flapMap function that will make every row in the RDD
|
||||
* into N cells for the bulk load
|
||||
* @param stagingDir The location on the FileSystem to bulk load into
|
||||
* @param familyHFileWriteOptionsMap Options that will define how the HFile for a
|
||||
* column family is written
|
||||
* @param compactionExclude Compaction excluded for the HFiles
|
||||
* @param maxSize Max size for the HFiles before they roll
|
||||
*/
|
||||
def hbaseBulkLoad(hc: HBaseContext,
|
||||
tableName: TableName,
|
||||
flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
|
||||
stagingDir:String,
|
||||
familyHFileWriteOptionsMap:
|
||||
util.Map[Array[Byte], FamilyHFileWriteOptions] =
|
||||
new util.HashMap[Array[Byte], FamilyHFileWriteOptions](),
|
||||
compactionExclude: Boolean = false,
|
||||
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = {
|
||||
hc.bulkLoad(rdd, tableName,
|
||||
flatMap, stagingDir, familyHFileWriteOptionsMap,
|
||||
compactionExclude, maxSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.spark
|
||||
|
||||
import java.io.Serializable
|
||||
|
||||
import org.apache.hadoop.hbase.util.Bytes
|
||||
|
||||
/**
|
||||
* This is the key to be used for sorting and shuffling.
|
||||
*
|
||||
* We will only partition on the rowKey but we will sort on all three
|
||||
*
|
||||
* @param rowKey Record RowKey
|
||||
* @param family Record ColumnFamily
|
||||
* @param qualifier Cell Qualifier
|
||||
*/
|
||||
class KeyFamilyQualifier(val rowKey:Array[Byte], val family:Array[Byte], val qualifier:Array[Byte])
|
||||
extends Comparable[KeyFamilyQualifier] with Serializable {
|
||||
override def compareTo(o: KeyFamilyQualifier): Int = {
|
||||
var result = Bytes.compareTo(rowKey, o.rowKey)
|
||||
if (result == 0) {
|
||||
result = Bytes.compareTo(family, o.family)
|
||||
if (result == 0) result = Bytes.compareTo(qualifier, o.qualifier)
|
||||
}
|
||||
result
|
||||
}
|
||||
override def toString: String = {
|
||||
Bytes.toString(rowKey) + ":" + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,537 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.spark
|
||||
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.hadoop.hbase.client.{Get, ConnectionFactory}
|
||||
import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFile}
|
||||
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
|
||||
import org.apache.hadoop.hbase.{HConstants, CellUtil, HBaseTestingUtility, TableName}
|
||||
import org.apache.hadoop.hbase.util.Bytes
|
||||
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
|
||||
import org.apache.spark.{SparkContext, Logging}
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
|
||||
|
||||
class BulkLoadSuite extends FunSuite with
|
||||
BeforeAndAfterEach with BeforeAndAfterAll with Logging {
|
||||
@transient var sc: SparkContext = null
|
||||
var TEST_UTIL = new HBaseTestingUtility
|
||||
|
||||
val tableName = "t1"
|
||||
val columnFamily1 = "f1"
|
||||
val columnFamily2 = "f2"
|
||||
val testFolder = new TemporaryFolder()
|
||||
|
||||
|
||||
override def beforeAll() {
|
||||
TEST_UTIL.startMiniCluster()
|
||||
logInfo(" - minicluster started")
|
||||
|
||||
try {
|
||||
TEST_UTIL.deleteTable(TableName.valueOf(tableName))
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
logInfo(" - no table " + tableName + " found")
|
||||
}
|
||||
|
||||
logInfo(" - created table")
|
||||
|
||||
val envMap = Map[String,String](("Xmx", "512m"))
|
||||
|
||||
sc = new SparkContext("local", "test", null, Nil, envMap)
|
||||
}
|
||||
|
||||
override def afterAll() {
|
||||
logInfo("shuting down minicluster")
|
||||
TEST_UTIL.shutdownMiniCluster()
|
||||
logInfo(" - minicluster shut down")
|
||||
TEST_UTIL.cleanupTestDir()
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("Basic Test multi family and multi column tests " +
|
||||
"with all default HFile Configs") {
|
||||
val config = TEST_UTIL.getConfiguration
|
||||
|
||||
logInfo(" - creating table " + tableName)
|
||||
TEST_UTIL.createTable(TableName.valueOf(tableName),
|
||||
Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)))
|
||||
|
||||
//There are a number of tests in here.
|
||||
// 1. Row keys are not in order
|
||||
// 2. Qualifiers are not in order
|
||||
// 3. Column Families are not in order
|
||||
// 4. There are tests for records in one column family and some in two column families
|
||||
// 5. There are records will a single qualifier and some with two
|
||||
val rdd = sc.parallelize(Array(
|
||||
(Bytes.toBytes("1"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
|
||||
(Bytes.toBytes("3"),
|
||||
Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a")))),
|
||||
(Bytes.toBytes("3"),
|
||||
Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b")))),
|
||||
(Bytes.toBytes("3"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c")))),
|
||||
(Bytes.toBytes("5"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))),
|
||||
(Bytes.toBytes("4"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))),
|
||||
(Bytes.toBytes("4"),
|
||||
Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))),
|
||||
(Bytes.toBytes("2"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))),
|
||||
(Bytes.toBytes("2"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))))
|
||||
|
||||
val hbaseContext = new HBaseContext(sc, config)
|
||||
|
||||
testFolder.create()
|
||||
val stagingFolder = testFolder.newFolder()
|
||||
|
||||
hbaseContext.bulkLoad[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
|
||||
TableName.valueOf(tableName),
|
||||
t => {
|
||||
val rowKey = t._1
|
||||
val family:Array[Byte] = t._2(0)._1
|
||||
val qualifier = t._2(0)._2
|
||||
val value = t._2(0)._3
|
||||
|
||||
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
|
||||
|
||||
Seq((keyFamilyQualifier, value)).iterator
|
||||
},
|
||||
stagingFolder.getPath)
|
||||
|
||||
val fs = FileSystem.get(config)
|
||||
assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2)
|
||||
|
||||
val conn = ConnectionFactory.createConnection(config)
|
||||
|
||||
val load = new LoadIncrementalHFiles(config)
|
||||
val table = conn.getTable(TableName.valueOf(tableName))
|
||||
try {
|
||||
load.doBulkLoad(new Path(stagingFolder.getPath), conn.getAdmin, table,
|
||||
conn.getRegionLocator(TableName.valueOf(tableName)))
|
||||
|
||||
val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
|
||||
assert(cells5.size == 1)
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
|
||||
|
||||
val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
|
||||
assert(cells4.size == 2)
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
|
||||
|
||||
val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
|
||||
assert(cells3.size == 3)
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a"))
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b"))
|
||||
|
||||
|
||||
val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
|
||||
assert(cells2.size == 2)
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
|
||||
|
||||
val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
|
||||
assert(cells1.size == 1)
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
|
||||
|
||||
} finally {
|
||||
table.close()
|
||||
val admin = ConnectionFactory.createConnection(config).getAdmin
|
||||
try {
|
||||
admin.disableTable(TableName.valueOf(tableName))
|
||||
admin.deleteTable(TableName.valueOf(tableName))
|
||||
} finally {
|
||||
admin.close()
|
||||
}
|
||||
fs.delete(new Path(stagingFolder.getPath), true)
|
||||
|
||||
testFolder.delete()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
test("bulkLoad to test HBase client: Test Roll Over and " +
|
||||
"using an implicit call to bulk load") {
|
||||
val config = TEST_UTIL.getConfiguration
|
||||
|
||||
logInfo(" - creating table " + tableName)
|
||||
TEST_UTIL.createTable(TableName.valueOf(tableName),
|
||||
Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)))
|
||||
|
||||
//There are a number of tests in here.
|
||||
// 1. Row keys are not in order
|
||||
// 2. Qualifiers are not in order
|
||||
// 3. Column Families are not in order
|
||||
// 4. There are tests for records in one column family and some in two column families
|
||||
// 5. There are records will a single qualifier and some with two
|
||||
val rdd = sc.parallelize(Array(
|
||||
(Bytes.toBytes("1"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
|
||||
(Bytes.toBytes("3"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b")))),
|
||||
(Bytes.toBytes("3"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.a")))),
|
||||
(Bytes.toBytes("3"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), Bytes.toBytes("foo2.c")))),
|
||||
(Bytes.toBytes("5"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))),
|
||||
(Bytes.toBytes("4"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))),
|
||||
(Bytes.toBytes("4"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))),
|
||||
(Bytes.toBytes("2"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))),
|
||||
(Bytes.toBytes("2"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))))
|
||||
|
||||
val hbaseContext = new HBaseContext(sc, config)
|
||||
|
||||
testFolder.create()
|
||||
val stagingFolder = testFolder.newFolder()
|
||||
|
||||
rdd.hbaseBulkLoad(hbaseContext,
|
||||
TableName.valueOf(tableName),
|
||||
t => {
|
||||
val rowKey = t._1
|
||||
val family:Array[Byte] = t._2(0)._1
|
||||
val qualifier = t._2(0)._2
|
||||
val value = t._2(0)._3
|
||||
|
||||
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
|
||||
|
||||
Seq((keyFamilyQualifier, value)).iterator
|
||||
},
|
||||
stagingFolder.getPath,
|
||||
new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
|
||||
compactionExclude = false,
|
||||
20)
|
||||
|
||||
val fs = FileSystem.get(config)
|
||||
assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 1)
|
||||
|
||||
assert(fs.listStatus(new Path(stagingFolder.getPath+ "/f1")).length == 5)
|
||||
|
||||
val conn = ConnectionFactory.createConnection(config)
|
||||
|
||||
val load = new LoadIncrementalHFiles(config)
|
||||
val table = conn.getTable(TableName.valueOf(tableName))
|
||||
try {
|
||||
load.doBulkLoad(new Path(stagingFolder.getPath),
|
||||
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
|
||||
|
||||
val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
|
||||
assert(cells5.size == 1)
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
|
||||
|
||||
val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
|
||||
assert(cells4.size == 2)
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
|
||||
|
||||
val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
|
||||
assert(cells3.size == 3)
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.a"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("b"))
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.c"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("c"))
|
||||
|
||||
val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
|
||||
assert(cells2.size == 2)
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
|
||||
|
||||
val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
|
||||
assert(cells1.size == 1)
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
|
||||
|
||||
} finally {
|
||||
table.close()
|
||||
val admin = ConnectionFactory.createConnection(config).getAdmin
|
||||
try {
|
||||
admin.disableTable(TableName.valueOf(tableName))
|
||||
admin.deleteTable(TableName.valueOf(tableName))
|
||||
} finally {
|
||||
admin.close()
|
||||
}
|
||||
fs.delete(new Path(stagingFolder.getPath), true)
|
||||
|
||||
testFolder.delete()
|
||||
}
|
||||
}
|
||||
|
||||
test("Basic Test multi family and multi column tests" +
|
||||
" with one column family with custom configs plus multi region") {
|
||||
val config = TEST_UTIL.getConfiguration
|
||||
|
||||
val splitKeys:Array[Array[Byte]] = new Array[Array[Byte]](2)
|
||||
splitKeys(0) = Bytes.toBytes("2")
|
||||
splitKeys(1) = Bytes.toBytes("4")
|
||||
|
||||
logInfo(" - creating table " + tableName)
|
||||
TEST_UTIL.createTable(TableName.valueOf(tableName),
|
||||
Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)),
|
||||
splitKeys)
|
||||
|
||||
//There are a number of tests in here.
|
||||
// 1. Row keys are not in order
|
||||
// 2. Qualifiers are not in order
|
||||
// 3. Column Families are not in order
|
||||
// 4. There are tests for records in one column family and some in two column families
|
||||
// 5. There are records will a single qualifier and some with two
|
||||
val rdd = sc.parallelize(Array(
|
||||
(Bytes.toBytes("1"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
|
||||
(Bytes.toBytes("3"),
|
||||
Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a")))),
|
||||
(Bytes.toBytes("3"),
|
||||
Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b")))),
|
||||
(Bytes.toBytes("3"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c")))),
|
||||
(Bytes.toBytes("5"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))),
|
||||
(Bytes.toBytes("4"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))),
|
||||
(Bytes.toBytes("4"),
|
||||
Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))),
|
||||
(Bytes.toBytes("2"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))),
|
||||
(Bytes.toBytes("2"),
|
||||
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))))
|
||||
|
||||
val hbaseContext = new HBaseContext(sc, config)
|
||||
|
||||
testFolder.create()
|
||||
val stagingFolder = testFolder.newFolder()
|
||||
|
||||
val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
|
||||
|
||||
val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128,
|
||||
"PREFIX")
|
||||
|
||||
familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options)
|
||||
|
||||
hbaseContext.bulkLoad[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
|
||||
TableName.valueOf(tableName),
|
||||
t => {
|
||||
val rowKey = t._1
|
||||
val family:Array[Byte] = t._2(0)._1
|
||||
val qualifier = t._2(0)._2
|
||||
val value = t._2(0)._3
|
||||
|
||||
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
|
||||
|
||||
Seq((keyFamilyQualifier, value)).iterator
|
||||
},
|
||||
stagingFolder.getPath,
|
||||
familyHBaseWriterOptions,
|
||||
compactionExclude = false,
|
||||
HConstants.DEFAULT_MAX_FILE_SIZE)
|
||||
|
||||
val fs = FileSystem.get(config)
|
||||
assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2)
|
||||
|
||||
val f1FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f1"))
|
||||
for ( i <- 0 until f1FileList.length) {
|
||||
val reader = HFile.createReader(fs, f1FileList(i).getPath,
|
||||
new CacheConfig(config), config)
|
||||
assert(reader.getCompressionAlgorithm.getName.equals("gz"))
|
||||
assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
|
||||
}
|
||||
|
||||
assert( 3 == f1FileList.length)
|
||||
|
||||
val f2FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f2"))
|
||||
for ( i <- 0 until f2FileList.length) {
|
||||
val reader = HFile.createReader(fs, f2FileList(i).getPath,
|
||||
new CacheConfig(config), config)
|
||||
assert(reader.getCompressionAlgorithm.getName.equals("none"))
|
||||
assert(reader.getDataBlockEncoding.name().equals("NONE"))
|
||||
}
|
||||
|
||||
assert( 2 == f2FileList.length)
|
||||
|
||||
|
||||
val conn = ConnectionFactory.createConnection(config)
|
||||
|
||||
val load = new LoadIncrementalHFiles(config)
|
||||
val table = conn.getTable(TableName.valueOf(tableName))
|
||||
try {
|
||||
load.doBulkLoad(new Path(stagingFolder.getPath),
|
||||
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
|
||||
|
||||
val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
|
||||
assert(cells5.size == 1)
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
|
||||
|
||||
val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
|
||||
assert(cells4.size == 2)
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
|
||||
|
||||
val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
|
||||
assert(cells3.size == 3)
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a"))
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b"))
|
||||
|
||||
|
||||
val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
|
||||
assert(cells2.size == 2)
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
|
||||
|
||||
val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
|
||||
assert(cells1.size == 1)
|
||||
assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
|
||||
assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
|
||||
assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
|
||||
|
||||
} finally {
|
||||
table.close()
|
||||
val admin = ConnectionFactory.createConnection(config).getAdmin
|
||||
try {
|
||||
admin.disableTable(TableName.valueOf(tableName))
|
||||
admin.deleteTable(TableName.valueOf(tableName))
|
||||
} finally {
|
||||
admin.close()
|
||||
}
|
||||
fs.delete(new Path(stagingFolder.getPath), true)
|
||||
|
||||
testFolder.delete()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
test("bulkLoad partitioner tests") {
|
||||
|
||||
var splitKeys:Array[Array[Byte]] = new Array[Array[Byte]](3)
|
||||
splitKeys(0) = Bytes.toBytes("")
|
||||
splitKeys(1) = Bytes.toBytes("3")
|
||||
splitKeys(2) = Bytes.toBytes("7")
|
||||
|
||||
var partitioner = new BulkLoadPartitioner(splitKeys)
|
||||
|
||||
assert(0 == partitioner.getPartition(Bytes.toBytes("")))
|
||||
assert(0 == partitioner.getPartition(Bytes.toBytes("1")))
|
||||
assert(0 == partitioner.getPartition(Bytes.toBytes("2")))
|
||||
assert(1 == partitioner.getPartition(Bytes.toBytes("3")))
|
||||
assert(1 == partitioner.getPartition(Bytes.toBytes("4")))
|
||||
assert(1 == partitioner.getPartition(Bytes.toBytes("6")))
|
||||
assert(2 == partitioner.getPartition(Bytes.toBytes("7")))
|
||||
assert(2 == partitioner.getPartition(Bytes.toBytes("8")))
|
||||
|
||||
|
||||
splitKeys = new Array[Array[Byte]](1)
|
||||
splitKeys(0) = Bytes.toBytes("")
|
||||
|
||||
partitioner = new BulkLoadPartitioner(splitKeys)
|
||||
|
||||
assert(0 == partitioner.getPartition(Bytes.toBytes("")))
|
||||
assert(0 == partitioner.getPartition(Bytes.toBytes("1")))
|
||||
assert(0 == partitioner.getPartition(Bytes.toBytes("2")))
|
||||
assert(0 == partitioner.getPartition(Bytes.toBytes("3")))
|
||||
assert(0 == partitioner.getPartition(Bytes.toBytes("4")))
|
||||
assert(0 == partitioner.getPartition(Bytes.toBytes("6")))
|
||||
assert(0 == partitioner.getPartition(Bytes.toBytes("7")))
|
||||
|
||||
splitKeys = new Array[Array[Byte]](7)
|
||||
splitKeys(0) = Bytes.toBytes("")
|
||||
splitKeys(1) = Bytes.toBytes("02")
|
||||
splitKeys(2) = Bytes.toBytes("04")
|
||||
splitKeys(3) = Bytes.toBytes("06")
|
||||
splitKeys(4) = Bytes.toBytes("08")
|
||||
splitKeys(5) = Bytes.toBytes("10")
|
||||
splitKeys(6) = Bytes.toBytes("12")
|
||||
|
||||
partitioner = new BulkLoadPartitioner(splitKeys)
|
||||
|
||||
assert(0 == partitioner.getPartition(Bytes.toBytes("")))
|
||||
assert(0 == partitioner.getPartition(Bytes.toBytes("01")))
|
||||
assert(1 == partitioner.getPartition(Bytes.toBytes("02")))
|
||||
assert(1 == partitioner.getPartition(Bytes.toBytes("03")))
|
||||
assert(2 == partitioner.getPartition(Bytes.toBytes("04")))
|
||||
assert(2 == partitioner.getPartition(Bytes.toBytes("05")))
|
||||
assert(3 == partitioner.getPartition(Bytes.toBytes("06")))
|
||||
assert(3 == partitioner.getPartition(Bytes.toBytes("07")))
|
||||
assert(4 == partitioner.getPartition(Bytes.toBytes("08")))
|
||||
assert(4 == partitioner.getPartition(Bytes.toBytes("09")))
|
||||
assert(5 == partitioner.getPartition(Bytes.toBytes("10")))
|
||||
assert(5 == partitioner.getPartition(Bytes.toBytes("11")))
|
||||
assert(6 == partitioner.getPartition(Bytes.toBytes("12")))
|
||||
assert(6 == partitioner.getPartition(Bytes.toBytes("13")))
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue