HBASE-15271 Spark bulk load should write to temporary location and then rename on success.

Signed-off-by: Ted Yu <tedyu@apache.org>
Signed-off-by: Jonathan Hsieh <jon@cloudera.com>
Signed-off-by: Sean Busbey <busbey@apache.org>
This commit is contained in:
Ted Malaska 2016-03-08 17:26:29 -08:00 committed by Sean Busbey
parent 14217cef24
commit b29ce7f114
1 changed files with 36 additions and 21 deletions

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.spark
import java.net.InetSocketAddress
import java.util
import java.util.UUID
import javax.management.openmbean.KeyAlreadyExistsException
import org.apache.hadoop.hbase.fs.HFileSystem
@ -680,7 +681,7 @@ class HBaseContext(@transient sc: SparkContext,
//This will only roll if we have at least one column family file that is
//bigger then maxSize and we have finished a given row key
if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
rollWriters(writerMap,
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
@ -690,7 +691,7 @@ class HBaseContext(@transient sc: SparkContext,
previousRow = keyFamilyQualifier.rowKey
}
//We have finished all the data so lets close up the writers
rollWriters(writerMap,
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
@ -830,7 +831,7 @@ class HBaseContext(@transient sc: SparkContext,
//This will only roll if we have at least one column family file that is
//bigger then maxSize and we have finished a given row key
if (rollOverRequested) {
rollWriters(writerMap,
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
@ -844,7 +845,7 @@ class HBaseContext(@transient sc: SparkContext,
//If there is no writer for a given column family then
//it will get created here.
//We have finished all the data so lets close up the writers
rollWriters(writerMap,
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
@ -889,17 +890,15 @@ class HBaseContext(@transient sc: SparkContext,
valueOf(familyOptions.dataBlockEncoding))
val hFileContext = contextBuilder.build()
if (null == favoredNodes) {
new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build())
} else {
new WriterLength(0,
new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
.withFavoredNodes(favoredNodes).build())
}
//Add a '_' to the file name because this is a unfinished file. A rename will happen
// to remove the '_' when the file is closed.
new WriterLength(0,
new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
.withBloomType(BloomType.valueOf(familyOptions.bloomType))
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
.withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", "")))
.withFavoredNodes(favoredNodes).build())
}
/**
@ -1013,13 +1012,15 @@ class HBaseContext(@transient sc: SparkContext,
/**
* This will roll all Writers
* @param fs Hadoop FileSystem object
* @param writerMap HashMap that contains all the writers
* @param regionSplitPartitioner The partitioner with knowledge of how the
* Region's are split by row key
* @param previousRow The last row to fill the HFile ending range metadata
* @param compactionExclude The exclude compaction metadata flag for the HFile
*/
private def rollWriters(writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength],
private def rollWriters(fs:FileSystem,
writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength],
regionSplitPartitioner: BulkLoadPartitioner,
previousRow: Array[Byte],
compactionExclude: Boolean): Unit = {
@ -1027,7 +1028,7 @@ class HBaseContext(@transient sc: SparkContext,
if (wl.writer != null) {
logDebug("Writer=" + wl.writer.getPath +
(if (wl.written == 0) "" else ", wrote=" + wl.written))
closeHFileWriter(wl.writer,
closeHFileWriter(fs, wl.writer,
regionSplitPartitioner,
previousRow,
compactionExclude)
@ -1039,16 +1040,18 @@ class HBaseContext(@transient sc: SparkContext,
/**
* Function to close an HFile
* @param fs Hadoop FileSystem object
* @param w HFile Writer
* @param regionSplitPartitioner The partitioner with knowledge of how the
* Region's are split by row key
* @param previousRow The last row to fill the HFile ending range metadata
* @param compactionExclude The exclude compaction metadata flag for the HFile
*/
private def closeHFileWriter(w: StoreFile.Writer,
regionSplitPartitioner: BulkLoadPartitioner,
previousRow: Array[Byte],
compactionExclude: Boolean): Unit = {
private def closeHFileWriter(fs:FileSystem,
w: StoreFile.Writer,
regionSplitPartitioner: BulkLoadPartitioner,
previousRow: Array[Byte],
compactionExclude: Boolean): Unit = {
if (w != null) {
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
Bytes.toBytes(System.currentTimeMillis()))
@ -1060,6 +1063,18 @@ class HBaseContext(@transient sc: SparkContext,
Bytes.toBytes(compactionExclude))
w.appendTrackedTimestampsToMetadata()
w.close()
val srcPath = w.getPath
//In the new path you will see that we are using substring. This is to
// remove the '_' character in front of the HFile name. '_' is a character
// that will tell HBase that this file shouldn't be included in the bulk load
// This feature is to protect for unfinished HFiles being submitted to HBase
val newPath = new Path(w.getPath.getParent, w.getPath.getName.substring(1))
if (!fs.rename(srcPath, newPath)) {
throw new IOException("Unable to rename '" + srcPath +
"' to " + newPath)
}
}
}