HBASE-14340 Add second bulk load option to Spark Bulk Load to send puts as the value (Ted Malaska)

This commit is contained in:
Andrew Purtell 2015-11-17 13:48:47 -08:00
parent dadfe7da04
commit ca1048415b
6 changed files with 1014 additions and 222 deletions

View File

@ -36,19 +36,21 @@ class BulkLoadPartitioner(startKeys:Array[Array[Byte]])
override def getPartition(key: Any): Int = { override def getPartition(key: Any): Int = {
val rowKey:Array[Byte] =
key match {
case qualifier: KeyFamilyQualifier =>
qualifier.rowKey
case _ =>
key.asInstanceOf[Array[Byte]]
}
val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] { val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] {
override def compare(o1: Array[Byte], o2: Array[Byte]): Int = { override def compare(o1: Array[Byte], o2: Array[Byte]): Int = {
Bytes.compareTo(o1, o2) Bytes.compareTo(o1, o2)
} }
} }
val rowKey:Array[Byte] =
key match {
case qualifier: KeyFamilyQualifier =>
qualifier.rowKey
case wrapper: ByteArrayWrapper =>
wrapper.value
case _ =>
key.asInstanceOf[Array[Byte]]
}
val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator) val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator)
if (partition < 0) partition * -1 + -2 if (partition < 0) partition * -1 + -2
else partition else partition

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import java.io.Serializable
import org.apache.hadoop.hbase.util.Bytes
/**
* This is a wrapper over a byte array so it can work as
* a key in a hashMap
*
* @param value The Byte Array value
*/
class ByteArrayWrapper (var value:Array[Byte])
extends Comparable[ByteArrayWrapper] with Serializable {
override def compareTo(valueOther: ByteArrayWrapper): Int = {
Bytes.compareTo(value,valueOther.value)
}
override def equals(o2: Any): Boolean = {
o2 match {
case wrapper: ByteArrayWrapper =>
Bytes.equals(value, wrapper.value)
case _ =>
false
}
}
override def hashCode():Int = {
Bytes.hashCode(value)
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import java.util
/**
* This object is a clean way to store and sort all cells that will be bulk
* loaded into a single row
*/
class FamiliesQualifiersValues extends Serializable {
//Tree maps are used because we need the results to
// be sorted when we read them
val familyMap = new util.TreeMap[ByteArrayWrapper,
util.TreeMap[ByteArrayWrapper, Array[Byte]]]()
//normally in a row there are more columns then
//column families this wrapper is reused for column
//family look ups
val reusableWrapper = new ByteArrayWrapper(null)
/**
* Adds a new cell to an existing row
* @param family HBase column family
* @param qualifier HBase column qualifier
* @param value HBase cell value
*/
def += (family: Array[Byte], qualifier: Array[Byte], value: Array[Byte]): Unit = {
reusableWrapper.value = family
var qualifierValues = familyMap.get(reusableWrapper)
if (qualifierValues == null) {
qualifierValues = new util.TreeMap[ByteArrayWrapper, Array[Byte]]()
familyMap.put(new ByteArrayWrapper(family), qualifierValues)
}
qualifierValues.put(new ByteArrayWrapper(qualifier), value)
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.spark
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util import java.util
import javax.management.openmbean.KeyAlreadyExistsException
import org.apache.hadoop.hbase.fs.HFileSystem import org.apache.hadoop.hbase.fs.HFileSystem
import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase._
@ -575,7 +576,8 @@ class HBaseContext(@transient sc: SparkContext,
def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
/** /**
* A Spark Implementation of HBase Bulk load * Spark Implementation of HBase Bulk load for wide rows or when
* values are not already combined at the time of the map process
* *
* This will take the content from an existing RDD then sort and shuffle * This will take the content from an existing RDD then sort and shuffle
* it with respect to region splits. The result of that sort and shuffle * it with respect to region splits. The result of that sort and shuffle
@ -616,10 +618,10 @@ class HBaseContext(@transient sc: SparkContext,
val startKeys = regionLocator.getStartKeys val startKeys = regionLocator.getStartKeys
val defaultCompressionStr = config.get("hfile.compression", val defaultCompressionStr = config.get("hfile.compression",
Compression.Algorithm.NONE.getName) Compression.Algorithm.NONE.getName)
val defaultCompression = HFileWriterImpl val hfileCompression = HFileWriterImpl
.compressionByName(defaultCompressionStr) .compressionByName(defaultCompressionStr)
val now = System.currentTimeMillis() val nowTimeStamp = System.currentTimeMillis()
val tableNameByteArray = tableName.getName val tableRawName = tableName.getName
val familyHFileWriteOptionsMapInternal = val familyHFileWriteOptionsMapInternal =
new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions] new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
@ -631,53 +633,6 @@ class HBaseContext(@transient sc: SparkContext,
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue) familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
} }
/**
* This will return a new HFile writer when requested
*
* @param family column family
* @param conf configuration to connect to HBase
* @param favoredNodes nodes that we would like to write too
* @param fs FileSystem object where we will be writing the HFiles to
* @return WriterLength object
*/
def getNewWriter(family: Array[Byte], conf: Configuration,
favoredNodes: Array[InetSocketAddress],
fs:FileSystem,
familydir:Path): WriterLength = {
var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family))
if (familyOptions == null) {
familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString,
BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString)
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions)
}
val tempConf = new Configuration(conf)
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
val contextBuilder = new HFileContextBuilder()
.withCompression(Algorithm.valueOf(familyOptions.compression))
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
.withBlockSize(familyOptions.blockSize)
contextBuilder.withDataBlockEncoding(DataBlockEncoding.
valueOf(familyOptions.dataBlockEncoding))
val hFileContext = contextBuilder.build()
if (null == favoredNodes) {
new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build())
} else {
new WriterLength(0,
new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
.withFavoredNodes(favoredNodes).build())
}
}
val regionSplitPartitioner = val regionSplitPartitioner =
new BulkLoadPartitioner(startKeys) new BulkLoadPartitioner(startKeys)
@ -695,118 +650,414 @@ class HBaseContext(@transient sc: SparkContext,
val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength] val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
var rollOverRequested = false var rollOverRequested = false
val localTableName = TableName.valueOf(tableRawName)
/**
* This will roll all writers
*/
def rollWriters(): Unit = {
writerMap.values.foreach( wl => {
if (wl.writer != null) {
logDebug("Writer=" + wl.writer.getPath +
(if (wl.written == 0) "" else ", wrote=" + wl.written))
close(wl.writer)
}
})
writerMap.clear()
rollOverRequested = false
}
/**
* This function will close a given HFile writer
* @param w The writer to close
*/
def close(w:StoreFile.Writer): Unit = {
if (w != null) {
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
Bytes.toBytes(System.currentTimeMillis()))
w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow)))
w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
Bytes.toBytes(true))
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
Bytes.toBytes(compactionExclude))
w.appendTrackedTimestampsToMetadata()
w.close()
}
}
//Here is where we finally iterate through the data in this partition of the //Here is where we finally iterate through the data in this partition of the
//RDD that has been sorted and partitioned //RDD that has been sorted and partitioned
it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) => it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
//This will get a writer for the column family val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
//If there is no writer for a given column family then
//it will get created here.
val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(keyFamilyQualifier.family), {
val familyDir = new Path(stagingDir, Bytes.toString(keyFamilyQualifier.family))
fs.mkdirs(familyDir)
val loc:HRegionLocation = {
try {
val locator =
conn.getRegionLocator(TableName.valueOf(tableNameByteArray))
locator.getRegionLocation(keyFamilyQualifier.rowKey)
} catch {
case e: Throwable =>
logWarning("there's something wrong when locating rowkey: " +
Bytes.toString(keyFamilyQualifier.rowKey))
null
}
}
if (null == loc) {
if (log.isTraceEnabled) {
logTrace("failed to get region location, so use default writer: " +
Bytes.toString(keyFamilyQualifier.rowKey))
}
getNewWriter(family = keyFamilyQualifier.family, conf = conf, favoredNodes = null,
fs = fs, familydir = familyDir)
} else {
if (log.isDebugEnabled) {
logDebug("first rowkey: [" + Bytes.toString(keyFamilyQualifier.rowKey) + "]")
}
val initialIsa =
new InetSocketAddress(loc.getHostname, loc.getPort)
if (initialIsa.isUnresolved) {
if (log.isTraceEnabled) {
logTrace("failed to resolve bind address: " + loc.getHostname + ":"
+ loc.getPort + ", so use default writer")
}
getNewWriter(keyFamilyQualifier.family, conf, null, fs, familyDir)
} else {
if(log.isDebugEnabled) {
logDebug("use favored nodes writer: " + initialIsa.getHostString)
}
getNewWriter(keyFamilyQualifier.family, conf,
Array[InetSocketAddress](initialIsa), fs, familyDir)
}
}
})
val keyValue =new KeyValue(keyFamilyQualifier.rowKey,
keyFamilyQualifier.family, keyFamilyQualifier.family,
keyFamilyQualifier.qualifier, keyFamilyQualifier.qualifier,
now,cellValue) cellValue,
nowTimeStamp,
wl.writer.append(keyValue) fs,
wl.written += keyValue.getLength conn,
localTableName,
conf,
familyHFileWriteOptionsMapInternal,
hfileCompression,
writerMap,
stagingDir)
rollOverRequested = rollOverRequested || wl.written > maxSize rollOverRequested = rollOverRequested || wl.written > maxSize
//This will only roll if we have at least one column family file that is //This will only roll if we have at least one column family file that is
//bigger then maxSize and we have finished a given row key //bigger then maxSize and we have finished a given row key
if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) { if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
rollWriters() rollWriters(writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
} }
previousRow = keyFamilyQualifier.rowKey previousRow = keyFamilyQualifier.rowKey
} }
//We have finished all the data so lets close up the writers //We have finished all the data so lets close up the writers
rollWriters() rollWriters(writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
}) })
} }
/**
* Spark Implementation of HBase Bulk load for short rows some where less then
* a 1000 columns. This bulk load should be faster for tables will thinner
* rows then the other spark implementation of bulk load that puts only one
* value into a record going into a shuffle
*
* This will take the content from an existing RDD then sort and shuffle
* it with respect to region splits. The result of that sort and shuffle
* will be written to HFiles.
*
* After this function is executed the user will have to call
* LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
*
* In this implementation, only the rowKey is given to the shuffle as the key
* and all the columns are already linked to the RowKey before the shuffle
* stage. The sorting of the qualifier is done in memory out side of the
* shuffle stage
*
* Also make sure that incoming RDDs only have one record for every row key.
*
* @param rdd The RDD we are bulk loading from
* @param tableName The HBase table we are loading into
* @param mapFunction A function that will convert the RDD records to
* the key value format used for the shuffle to prep
* for writing to the bulk loaded HFiles
* @param stagingDir The location on the FileSystem to bulk load into
* @param familyHFileWriteOptionsMap Options that will define how the HFile for a
* column family is written
* @param compactionExclude Compaction excluded for the HFiles
* @param maxSize Max size for the HFiles before they roll
* @tparam T The Type of values in the original RDD
*/
def bulkLoadThinRows[T](rdd:RDD[T],
tableName: TableName,
mapFunction: (T) =>
(ByteArrayWrapper, FamiliesQualifiersValues),
stagingDir:String,
familyHFileWriteOptionsMap:
util.Map[Array[Byte], FamilyHFileWriteOptions] =
new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude: Boolean = false,
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
Unit = {
val conn = ConnectionFactory.createConnection(config)
val regionLocator = conn.getRegionLocator(tableName)
val startKeys = regionLocator.getStartKeys
val defaultCompressionStr = config.get("hfile.compression",
Compression.Algorithm.NONE.getName)
val defaultCompression = HFileWriterImpl
.compressionByName(defaultCompressionStr)
val nowTimeStamp = System.currentTimeMillis()
val tableRawName = tableName.getName
val familyHFileWriteOptionsMapInternal =
new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
while (entrySetIt.hasNext) {
val entry = entrySetIt.next()
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
}
val regionSplitPartitioner =
new BulkLoadPartitioner(startKeys)
//This is where all the magic happens
//Here we are going to do the following things
// 1. FlapMap every row in the RDD into key column value tuples
// 2. Then we are going to repartition sort and shuffle
// 3. Finally we are going to write out our HFiles
rdd.map( r => mapFunction(r)).
repartitionAndSortWithinPartitions(regionSplitPartitioner).
hbaseForeachPartition(this, (it, conn) => {
val conf = broadcastedConf.value.value
val fs = FileSystem.get(conf)
val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
var rollOverRequested = false
val localTableName = TableName.valueOf(tableRawName)
//Here is where we finally iterate through the data in this partition of the
//RDD that has been sorted and partitioned
it.foreach{ case (rowKey:ByteArrayWrapper,
familiesQualifiersValues:FamiliesQualifiersValues) =>
if (Bytes.compareTo(previousRow, rowKey.value) == 0) {
throw new KeyAlreadyExistsException("The following key was sent to the " +
"HFile load more then one: " + Bytes.toString(previousRow))
}
//The family map is a tree map so the families will be sorted
val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator()
while (familyIt.hasNext) {
val familyEntry = familyIt.next()
val family = familyEntry.getKey.value
val qualifierIt = familyEntry.getValue.entrySet().iterator()
//The qualifier map is a tree map so the families will be sorted
while (qualifierIt.hasNext) {
val qualifierEntry = qualifierIt.next()
val qualifier = qualifierEntry.getKey
val cellValue = qualifierEntry.getValue
writeValueToHFile(rowKey.value,
family,
qualifier.value, // qualifier
cellValue, // value
nowTimeStamp,
fs,
conn,
localTableName,
conf,
familyHFileWriteOptionsMapInternal,
defaultCompression,
writerMap,
stagingDir)
previousRow = rowKey.value
}
writerMap.values.foreach( wl => {
rollOverRequested = rollOverRequested || wl.written > maxSize
//This will only roll if we have at least one column family file that is
//bigger then maxSize and we have finished a given row key
if (rollOverRequested) {
rollWriters(writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
}
})
}
}
//This will get a writer for the column family
//If there is no writer for a given column family then
//it will get created here.
//We have finished all the data so lets close up the writers
rollWriters(writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
})
}
/**
* This will return a new HFile writer when requested
*
* @param family column family
* @param conf configuration to connect to HBase
* @param favoredNodes nodes that we would like to write too
* @param fs FileSystem object where we will be writing the HFiles to
* @return WriterLength object
*/
private def getNewHFileWriter(family: Array[Byte], conf: Configuration,
favoredNodes: Array[InetSocketAddress],
fs:FileSystem,
familydir:Path,
familyHFileWriteOptionsMapInternal:
util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions],
defaultCompression:Compression.Algorithm): WriterLength = {
var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family))
if (familyOptions == null) {
familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString,
BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString)
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions)
}
val tempConf = new Configuration(conf)
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
val contextBuilder = new HFileContextBuilder()
.withCompression(Algorithm.valueOf(familyOptions.compression))
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
.withBlockSize(familyOptions.blockSize)
contextBuilder.withDataBlockEncoding(DataBlockEncoding.
valueOf(familyOptions.dataBlockEncoding))
val hFileContext = contextBuilder.build()
if (null == favoredNodes) {
new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build())
} else {
new WriterLength(0,
new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
.withFavoredNodes(favoredNodes).build())
}
}
/**
* Encompasses the logic to write a value to an HFile
*
* @param rowKey The RowKey for the record
* @param family HBase column family for the record
* @param qualifier HBase column qualifier for the record
* @param cellValue HBase cell value
* @param nowTimeStamp The cell time stamp
* @param fs Connection to the FileSystem for the HFile
* @param conn Connection to HBaes
* @param tableName HBase TableName object
* @param conf Configuration to be used when making a new HFile
* @param familyHFileWriteOptionsMapInternal Extra configs for the HFile
* @param hfileCompression The compression codec for the new HFile
* @param writerMap HashMap of existing writers and their offsets
* @param stagingDir The staging directory on the FileSystem to store
* the HFiles
* @return The writer for the given HFile that was writen
* too
*/
private def writeValueToHFile(rowKey: Array[Byte],
family: Array[Byte],
qualifier: Array[Byte],
cellValue:Array[Byte],
nowTimeStamp: Long,
fs: FileSystem,
conn: Connection,
tableName: TableName,
conf: Configuration,
familyHFileWriteOptionsMapInternal:
util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions],
hfileCompression:Compression.Algorithm,
writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength],
stagingDir: String
): WriterLength = {
val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(family), {
val familyDir = new Path(stagingDir, Bytes.toString(family))
fs.mkdirs(familyDir)
val loc:HRegionLocation = {
try {
val locator =
conn.getRegionLocator(tableName)
locator.getRegionLocation(rowKey)
} catch {
case e: Throwable =>
logWarning("there's something wrong when locating rowkey: " +
Bytes.toString(rowKey))
null
}
}
if (null == loc) {
if (log.isTraceEnabled) {
logTrace("failed to get region location, so use default writer: " +
Bytes.toString(rowKey))
}
getNewHFileWriter(family = family,
conf = conf,
favoredNodes = null,
fs = fs,
familydir = familyDir,
familyHFileWriteOptionsMapInternal,
hfileCompression)
} else {
if (log.isDebugEnabled) {
logDebug("first rowkey: [" + Bytes.toString(rowKey) + "]")
}
val initialIsa =
new InetSocketAddress(loc.getHostname, loc.getPort)
if (initialIsa.isUnresolved) {
if (log.isTraceEnabled) {
logTrace("failed to resolve bind address: " + loc.getHostname + ":"
+ loc.getPort + ", so use default writer")
}
getNewHFileWriter(family,
conf,
null,
fs,
familyDir,
familyHFileWriteOptionsMapInternal,
hfileCompression)
} else {
if(log.isDebugEnabled) {
logDebug("use favored nodes writer: " + initialIsa.getHostString)
}
getNewHFileWriter(family,
conf,
Array[InetSocketAddress](initialIsa),
fs,
familyDir,
familyHFileWriteOptionsMapInternal,
hfileCompression)
}
}
})
val keyValue =new KeyValue(rowKey,
family,
qualifier,
nowTimeStamp,cellValue)
wl.writer.append(keyValue)
wl.written += keyValue.getLength
wl
}
/**
* This will roll all Writers
* @param writerMap HashMap that contains all the writers
* @param regionSplitPartitioner The partitioner with knowledge of how the
* Region's are split by row key
* @param previousRow The last row to fill the HFile ending range metadata
* @param compactionExclude The exclude compaction metadata flag for the HFile
*/
private def rollWriters(writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength],
regionSplitPartitioner: BulkLoadPartitioner,
previousRow: Array[Byte],
compactionExclude: Boolean): Unit = {
writerMap.values.foreach( wl => {
if (wl.writer != null) {
logDebug("Writer=" + wl.writer.getPath +
(if (wl.written == 0) "" else ", wrote=" + wl.written))
closeHFileWriter(wl.writer,
regionSplitPartitioner,
previousRow,
compactionExclude)
}
})
writerMap.clear()
}
/**
* Function to close an HFile
* @param w HFile Writer
* @param regionSplitPartitioner The partitioner with knowledge of how the
* Region's are split by row key
* @param previousRow The last row to fill the HFile ending range metadata
* @param compactionExclude The exclude compaction metadata flag for the HFile
*/
private def closeHFileWriter(w: StoreFile.Writer,
regionSplitPartitioner: BulkLoadPartitioner,
previousRow: Array[Byte],
compactionExclude: Boolean): Unit = {
if (w != null) {
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
Bytes.toBytes(System.currentTimeMillis()))
w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow)))
w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
Bytes.toBytes(true))
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
Bytes.toBytes(compactionExclude))
w.appendTrackedTimestampsToMetadata()
w.close()
}
}
/** /**
* This is a wrapper class around StoreFile.Writer. The reason for the * This is a wrapper class around StoreFile.Writer. The reason for the
* wrapper is to keep the length of the file along side the writer * wrapper is to keep the length of the file along side the writer
@ -815,30 +1066,6 @@ class HBaseContext(@transient sc: SparkContext,
* @param writer The number of bytes written to the writer * @param writer The number of bytes written to the writer
*/ */
class WriterLength(var written:Long, val writer:StoreFile.Writer) class WriterLength(var written:Long, val writer:StoreFile.Writer)
/**
* This is a wrapper over a byte array so it can work as
* a key in a hashMap
*
* @param o1 The Byte Array value
*/
class ByteArrayWrapper (val o1:Array[Byte])
extends Comparable[ByteArrayWrapper] with Serializable {
override def compareTo(o2: ByteArrayWrapper): Int = {
Bytes.compareTo(o1,o2.o1)
}
override def equals(o2: Any): Boolean = {
o2 match {
case wrapper: ByteArrayWrapper =>
Bytes.equals(o1, wrapper.o1)
case _ =>
false
}
}
override def hashCode():Int = {
Bytes.hashCode(o1)
}
}
} }
object LatestHBaseContextCache { object LatestHBaseContextCache {

View File

@ -19,13 +19,11 @@ package org.apache.hadoop.hbase.spark
import java.util import java.util
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.{HConstants, TableName} import org.apache.hadoop.hbase.{HConstants, TableName}
import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import scala.collection.immutable.HashMap
import scala.reflect.ClassTag import scala.reflect.ClassTag
/** /**
@ -164,8 +162,8 @@ object HBaseRDDFunctions
} }
/** /**
* Implicit method that gives easy access to HBaseContext's * Spark Implementation of HBase Bulk load for wide rows or when
* bulkLoad method. * values are not already combined at the time of the map process
* *
* A Spark Implementation of HBase Bulk load * A Spark Implementation of HBase Bulk load
* *
@ -203,5 +201,51 @@ object HBaseRDDFunctions
flatMap, stagingDir, familyHFileWriteOptionsMap, flatMap, stagingDir, familyHFileWriteOptionsMap,
compactionExclude, maxSize) compactionExclude, maxSize)
} }
/**
* Implicit method that gives easy access to HBaseContext's
* bulkLoadThinRows method.
*
* Spark Implementation of HBase Bulk load for short rows some where less then
* a 1000 columns. This bulk load should be faster for tables will thinner
* rows then the other spark implementation of bulk load that puts only one
* value into a record going into a shuffle
*
* This will take the content from an existing RDD then sort and shuffle
* it with respect to region splits. The result of that sort and shuffle
* will be written to HFiles.
*
* After this function is executed the user will have to call
* LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
*
* In this implementation only the rowKey is given to the shuffle as the key
* and all the columns are already linked to the RowKey before the shuffle
* stage. The sorting of the qualifier is done in memory out side of the
* shuffle stage
*
* @param tableName The HBase table we are loading into
* @param mapFunction A function that will convert the RDD records to
* the key value format used for the shuffle to prep
* for writing to the bulk loaded HFiles
* @param stagingDir The location on the FileSystem to bulk load into
* @param familyHFileWriteOptionsMap Options that will define how the HFile for a
* column family is written
* @param compactionExclude Compaction excluded for the HFiles
* @param maxSize Max size for the HFiles before they roll
*/
def hbaseBulkLoadThinRows(hc: HBaseContext,
tableName: TableName,
mapFunction: (T) =>
(ByteArrayWrapper, FamiliesQualifiersValues),
stagingDir:String,
familyHFileWriteOptionsMap:
util.Map[Array[Byte], FamilyHFileWriteOptions] =
new util.HashMap[Array[Byte], FamilyHFileWriteOptions](),
compactionExclude: Boolean = false,
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = {
hc.bulkLoadThinRows(rdd, tableName,
mapFunction, stagingDir, familyHFileWriteOptionsMap,
compactionExclude, maxSize)
}
} }
} }

View File

@ -65,8 +65,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
sc.stop() sc.stop()
} }
test("Basic Test multi family and multi column tests " + test("Wide Row Bulk Load: Test multi family and multi column tests " +
"with all default HFile Configs") { "with all default HFile Configs.") {
val config = TEST_UTIL.getConfiguration val config = TEST_UTIL.getConfiguration
logInfo(" - creating table " + tableName) logInfo(" - creating table " + tableName)
@ -81,36 +81,38 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
// 5. There are records will a single qualifier and some with two // 5. There are records will a single qualifier and some with two
val rdd = sc.parallelize(Array( val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"), (Bytes.toBytes("1"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
(Bytes.toBytes("3"), (Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a")))), (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a"))),
(Bytes.toBytes("3"), (Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b")))), (Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b"))),
(Bytes.toBytes("3"), (Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c"))),
(Bytes.toBytes("5"), (Bytes.toBytes("5"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))),
(Bytes.toBytes("4"), (Bytes.toBytes("4"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))),
(Bytes.toBytes("4"), (Bytes.toBytes("4"),
Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))), (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))),
(Bytes.toBytes("2"), (Bytes.toBytes("2"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))),
(Bytes.toBytes("2"), (Bytes.toBytes("2"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2")))))) (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2")))))
val hbaseContext = new HBaseContext(sc, config) val hbaseContext = new HBaseContext(sc, config)
testFolder.create() testFolder.create()
val stagingFolder = testFolder.newFolder() val stagingFolder = testFolder.newFolder()
hbaseContext.bulkLoad[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, hbaseContext.bulkLoad[(Array[Byte], (Array[Byte], Array[Byte], Array[Byte]))](rdd,
TableName.valueOf(tableName), TableName.valueOf(tableName),
t => { t => {
val rowKey = t._1 val rowKey = t._1
val family:Array[Byte] = t._2(0)._1 val family:Array[Byte] = t._2._1
val qualifier = t._2(0)._2 val qualifier = t._2._2
val value = t._2(0)._3 val value:Array[Byte] = t._2._3
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
@ -188,7 +190,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
} }
} }
test("bulkLoad to test HBase client: Test Roll Over and " + test("Wide Row Bulk Load: Test HBase client: Test Roll Over and " +
"using an implicit call to bulk load") { "using an implicit call to bulk load") {
val config = TEST_UTIL.getConfiguration val config = TEST_UTIL.getConfiguration
@ -204,23 +206,23 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
// 5. There are records will a single qualifier and some with two // 5. There are records will a single qualifier and some with two
val rdd = sc.parallelize(Array( val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"), (Bytes.toBytes("1"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
(Bytes.toBytes("3"), (Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))),
(Bytes.toBytes("3"), (Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.a")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.a"))),
(Bytes.toBytes("3"), (Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), Bytes.toBytes("foo2.c")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), Bytes.toBytes("foo2.c"))),
(Bytes.toBytes("5"), (Bytes.toBytes("5"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))),
(Bytes.toBytes("4"), (Bytes.toBytes("4"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))),
(Bytes.toBytes("4"), (Bytes.toBytes("4"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))),
(Bytes.toBytes("2"), (Bytes.toBytes("2"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))),
(Bytes.toBytes("2"), (Bytes.toBytes("2"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2")))))) (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2")))))
val hbaseContext = new HBaseContext(sc, config) val hbaseContext = new HBaseContext(sc, config)
@ -231,9 +233,9 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
TableName.valueOf(tableName), TableName.valueOf(tableName),
t => { t => {
val rowKey = t._1 val rowKey = t._1
val family:Array[Byte] = t._2(0)._1 val family:Array[Byte] = t._2._1
val qualifier = t._2(0)._2 val qualifier = t._2._2
val value = t._2(0)._3 val value = t._2._3
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
@ -314,7 +316,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
} }
} }
test("Basic Test multi family and multi column tests" + test("Wide Row Bulk Load: Test multi family and multi column tests" +
" with one column family with custom configs plus multi region") { " with one column family with custom configs plus multi region") {
val config = TEST_UTIL.getConfiguration val config = TEST_UTIL.getConfiguration
@ -335,23 +337,23 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
// 5. There are records will a single qualifier and some with two // 5. There are records will a single qualifier and some with two
val rdd = sc.parallelize(Array( val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"), (Bytes.toBytes("1"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
(Bytes.toBytes("3"), (Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a")))), (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a"))),
(Bytes.toBytes("3"), (Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b")))), (Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b"))),
(Bytes.toBytes("3"), (Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c"))),
(Bytes.toBytes("5"), (Bytes.toBytes("5"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))),
(Bytes.toBytes("4"), (Bytes.toBytes("4"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))),
(Bytes.toBytes("4"), (Bytes.toBytes("4"),
Array((Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2")))), (Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))),
(Bytes.toBytes("2"), (Bytes.toBytes("2"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1")))), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))),
(Bytes.toBytes("2"), (Bytes.toBytes("2"),
Array((Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2")))))) (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2")))))
val hbaseContext = new HBaseContext(sc, config) val hbaseContext = new HBaseContext(sc, config)
@ -365,13 +367,13 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options) familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options)
hbaseContext.bulkLoad[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, hbaseContext.bulkLoad[(Array[Byte], (Array[Byte], Array[Byte], Array[Byte]))](rdd,
TableName.valueOf(tableName), TableName.valueOf(tableName),
t => { t => {
val rowKey = t._1 val rowKey = t._1
val family:Array[Byte] = t._2(0)._1 val family:Array[Byte] = t._2._1
val qualifier = t._2(0)._2 val qualifier = t._2._2
val value = t._2(0)._3 val value = t._2._3
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
@ -473,7 +475,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
} }
} }
test("bulkLoad partitioner tests") { test("Test partitioner") {
var splitKeys:Array[Array[Byte]] = new Array[Array[Byte]](3) var splitKeys:Array[Array[Byte]] = new Array[Array[Byte]](3)
splitKeys(0) = Bytes.toBytes("") splitKeys(0) = Bytes.toBytes("")
@ -530,8 +532,425 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
assert(5 == partitioner.getPartition(Bytes.toBytes("11"))) assert(5 == partitioner.getPartition(Bytes.toBytes("11")))
assert(6 == partitioner.getPartition(Bytes.toBytes("12"))) assert(6 == partitioner.getPartition(Bytes.toBytes("12")))
assert(6 == partitioner.getPartition(Bytes.toBytes("13"))) assert(6 == partitioner.getPartition(Bytes.toBytes("13")))
} }
test("Thin Row Bulk Load: Test multi family and multi column tests " +
"with all default HFile Configs") {
val config = TEST_UTIL.getConfiguration
logInfo(" - creating table " + tableName)
TEST_UTIL.createTable(TableName.valueOf(tableName),
Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)))
//There are a number of tests in here.
// 1. Row keys are not in order
// 2. Qualifiers are not in order
// 3. Column Families are not in order
// 4. There are tests for records in one column family and some in two column families
// 5. There are records will a single qualifier and some with two
val rdd = sc.parallelize(Array(
("1",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
("3",
(Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a"))),
("3",
(Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b"))),
("3",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c"))),
("5",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))),
("4",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))),
("4",
(Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))),
("2",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))),
("2",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))).
groupByKey()
val hbaseContext = new HBaseContext(sc, config)
testFolder.create()
val stagingFolder = testFolder.newFolder()
hbaseContext.bulkLoadThinRows[(String, Iterable[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
TableName.valueOf(tableName),
t => {
val rowKey = Bytes.toBytes(t._1)
val familyQualifiersValues = new FamiliesQualifiersValues
t._2.foreach(f => {
val family:Array[Byte] = f._1
val qualifier = f._2
val value:Array[Byte] = f._3
familyQualifiersValues +=(family, qualifier, value)
})
(new ByteArrayWrapper(rowKey), familyQualifiersValues)
},
stagingFolder.getPath)
val fs = FileSystem.get(config)
assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2)
val conn = ConnectionFactory.createConnection(config)
val load = new LoadIncrementalHFiles(config)
val table = conn.getTable(TableName.valueOf(tableName))
try {
load.doBulkLoad(new Path(stagingFolder.getPath), conn.getAdmin, table,
conn.getRegionLocator(TableName.valueOf(tableName)))
val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
assert(cells5.size == 1)
assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
assert(cells4.size == 2)
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
assert(cells3.size == 3)
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c"))
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a"))
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a"))
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b"))
val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
assert(cells2.size == 2)
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
assert(cells1.size == 1)
assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
} finally {
table.close()
val admin = ConnectionFactory.createConnection(config).getAdmin
try {
admin.disableTable(TableName.valueOf(tableName))
admin.deleteTable(TableName.valueOf(tableName))
} finally {
admin.close()
}
fs.delete(new Path(stagingFolder.getPath), true)
testFolder.delete()
}
}
test("Thin Row Bulk Load: Test HBase client: Test Roll Over and " +
"using an implicit call to bulk load") {
val config = TEST_UTIL.getConfiguration
logInfo(" - creating table " + tableName)
TEST_UTIL.createTable(TableName.valueOf(tableName),
Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)))
//There are a number of tests in here.
// 1. Row keys are not in order
// 2. Qualifiers are not in order
// 3. Column Families are not in order
// 4. There are tests for records in one column family and some in two column families
// 5. There are records will a single qualifier and some with two
val rdd = sc.parallelize(Array(
("1",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
("3",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))),
("3",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.a"))),
("3",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), Bytes.toBytes("foo2.c"))),
("5",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))),
("4",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))),
("4",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))),
("2",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))),
("2",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))).
groupByKey()
val hbaseContext = new HBaseContext(sc, config)
testFolder.create()
val stagingFolder = testFolder.newFolder()
rdd.hbaseBulkLoadThinRows(hbaseContext,
TableName.valueOf(tableName),
t => {
val rowKey = t._1
val familyQualifiersValues = new FamiliesQualifiersValues
t._2.foreach(f => {
val family:Array[Byte] = f._1
val qualifier = f._2
val value:Array[Byte] = f._3
familyQualifiersValues +=(family, qualifier, value)
})
(new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
},
stagingFolder.getPath,
new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude = false,
20)
val fs = FileSystem.get(config)
assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 1)
assert(fs.listStatus(new Path(stagingFolder.getPath+ "/f1")).length == 5)
val conn = ConnectionFactory.createConnection(config)
val load = new LoadIncrementalHFiles(config)
val table = conn.getTable(TableName.valueOf(tableName))
try {
load.doBulkLoad(new Path(stagingFolder.getPath),
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
assert(cells5.size == 1)
assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
assert(cells4.size == 2)
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
assert(cells3.size == 3)
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.a"))
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("b"))
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.c"))
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("c"))
val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
assert(cells2.size == 2)
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
assert(cells1.size == 1)
assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
} finally {
table.close()
val admin = ConnectionFactory.createConnection(config).getAdmin
try {
admin.disableTable(TableName.valueOf(tableName))
admin.deleteTable(TableName.valueOf(tableName))
} finally {
admin.close()
}
fs.delete(new Path(stagingFolder.getPath), true)
testFolder.delete()
}
}
test("Thin Row Bulk Load: Test multi family and multi column tests" +
" with one column family with custom configs plus multi region") {
val config = TEST_UTIL.getConfiguration
val splitKeys:Array[Array[Byte]] = new Array[Array[Byte]](2)
splitKeys(0) = Bytes.toBytes("2")
splitKeys(1) = Bytes.toBytes("4")
logInfo(" - creating table " + tableName)
TEST_UTIL.createTable(TableName.valueOf(tableName),
Array(Bytes.toBytes(columnFamily1), Bytes.toBytes(columnFamily2)),
splitKeys)
//There are a number of tests in here.
// 1. Row keys are not in order
// 2. Qualifiers are not in order
// 3. Column Families are not in order
// 4. There are tests for records in one column family and some in two column families
// 5. There are records will a single qualifier and some with two
val rdd = sc.parallelize(Array(
("1",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
("3",
(Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo2.a"))),
("3",
(Bytes.toBytes(columnFamily2), Bytes.toBytes("a"), Bytes.toBytes("foo2.b"))),
("3",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.c"))),
("5",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))),
("4",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))),
("4",
(Bytes.toBytes(columnFamily2), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))),
("2",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))),
("2",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("bar.2"))))).
groupByKey()
val hbaseContext = new HBaseContext(sc, config)
testFolder.create()
val stagingFolder = testFolder.newFolder()
val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128,
"PREFIX")
familyHBaseWriterOptions.put(Bytes.toBytes(columnFamily1), f1Options)
hbaseContext.bulkLoadThinRows[(String, Iterable[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
TableName.valueOf(tableName),
t => {
val rowKey = t._1
val familyQualifiersValues = new FamiliesQualifiersValues
t._2.foreach(f => {
val family:Array[Byte] = f._1
val qualifier = f._2
val value:Array[Byte] = f._3
familyQualifiersValues +=(family, qualifier, value)
})
(new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
},
stagingFolder.getPath,
familyHBaseWriterOptions,
compactionExclude = false,
HConstants.DEFAULT_MAX_FILE_SIZE)
val fs = FileSystem.get(config)
assert(fs.listStatus(new Path(stagingFolder.getPath)).length == 2)
val f1FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f1"))
for ( i <- 0 until f1FileList.length) {
val reader = HFile.createReader(fs, f1FileList(i).getPath,
new CacheConfig(config), config)
assert(reader.getCompressionAlgorithm.getName.equals("gz"))
assert(reader.getDataBlockEncoding.name().equals("PREFIX"))
}
assert( 3 == f1FileList.length)
val f2FileList = fs.listStatus(new Path(stagingFolder.getPath +"/f2"))
for ( i <- 0 until f2FileList.length) {
val reader = HFile.createReader(fs, f2FileList(i).getPath,
new CacheConfig(config), config)
assert(reader.getCompressionAlgorithm.getName.equals("none"))
assert(reader.getDataBlockEncoding.name().equals("NONE"))
}
assert( 2 == f2FileList.length)
val conn = ConnectionFactory.createConnection(config)
val load = new LoadIncrementalHFiles(config)
val table = conn.getTable(TableName.valueOf(tableName))
try {
load.doBulkLoad(new Path(stagingFolder.getPath),
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
val cells5 = table.get(new Get(Bytes.toBytes("5"))).listCells()
assert(cells5.size == 1)
assert(Bytes.toString(CellUtil.cloneValue(cells5.get(0))).equals("foo3"))
assert(Bytes.toString(CellUtil.cloneFamily(cells5.get(0))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells5.get(0))).equals("a"))
val cells4 = table.get(new Get(Bytes.toBytes("4"))).listCells()
assert(cells4.size == 2)
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(0))).equals("foo.1"))
assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(0))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(0))).equals("a"))
assert(Bytes.toString(CellUtil.cloneValue(cells4.get(1))).equals("foo.2"))
assert(Bytes.toString(CellUtil.cloneFamily(cells4.get(1))).equals("f2"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells4.get(1))).equals("b"))
val cells3 = table.get(new Get(Bytes.toBytes("3"))).listCells()
assert(cells3.size == 3)
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(0))).equals("foo2.c"))
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(0))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(0))).equals("a"))
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(1))).equals("foo2.b"))
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(1))).equals("f2"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(1))).equals("a"))
assert(Bytes.toString(CellUtil.cloneValue(cells3.get(2))).equals("foo2.a"))
assert(Bytes.toString(CellUtil.cloneFamily(cells3.get(2))).equals("f2"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells3.get(2))).equals("b"))
val cells2 = table.get(new Get(Bytes.toBytes("2"))).listCells()
assert(cells2.size == 2)
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(0))).equals("bar.1"))
assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(0))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(0))).equals("a"))
assert(Bytes.toString(CellUtil.cloneValue(cells2.get(1))).equals("bar.2"))
assert(Bytes.toString(CellUtil.cloneFamily(cells2.get(1))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells2.get(1))).equals("b"))
val cells1 = table.get(new Get(Bytes.toBytes("1"))).listCells()
assert(cells1.size == 1)
assert(Bytes.toString(CellUtil.cloneValue(cells1.get(0))).equals("foo1"))
assert(Bytes.toString(CellUtil.cloneFamily(cells1.get(0))).equals("f1"))
assert(Bytes.toString(CellUtil.cloneQualifier(cells1.get(0))).equals("a"))
} finally {
table.close()
val admin = ConnectionFactory.createConnection(config).getAdmin
try {
admin.disableTable(TableName.valueOf(tableName))
admin.deleteTable(TableName.valueOf(tableName))
} finally {
admin.close()
}
fs.delete(new Path(stagingFolder.getPath), true)
testFolder.delete()
}
}
} }