HBASE-20421 HBasecontext creates a connection but does not close it (Yu Wang)

This commit is contained in:
tedyu 2018-04-17 19:45:53 -07:00
parent fd2cec75f3
commit f4f2b68238
1 changed files with 179 additions and 173 deletions

View File

@ -628,87 +628,90 @@ class HBaseContext(@transient val sc: SparkContext,
throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
}
val conn = HBaseConnectionCache.getConnection(config)
val regionLocator = conn.getRegionLocator(tableName)
val startKeys = regionLocator.getStartKeys
if (startKeys.length == 0) {
logInfo("Table " + tableName.toString + " was not found")
}
val defaultCompressionStr = config.get("hfile.compression",
Compression.Algorithm.NONE.getName)
val hfileCompression = HFileWriterImpl
.compressionByName(defaultCompressionStr)
val nowTimeStamp = System.currentTimeMillis()
val tableRawName = tableName.getName
try {
val regionLocator = conn.getRegionLocator(tableName)
val startKeys = regionLocator.getStartKeys
if (startKeys.length == 0) {
logInfo("Table " + tableName.toString + " was not found")
}
val defaultCompressionStr = config.get("hfile.compression",
Compression.Algorithm.NONE.getName)
val hfileCompression = HFileWriterImpl
.compressionByName(defaultCompressionStr)
val nowTimeStamp = System.currentTimeMillis()
val tableRawName = tableName.getName
val familyHFileWriteOptionsMapInternal =
new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
val familyHFileWriteOptionsMapInternal =
new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
while (entrySetIt.hasNext) {
val entry = entrySetIt.next()
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
}
while (entrySetIt.hasNext) {
val entry = entrySetIt.next()
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
}
val regionSplitPartitioner =
new BulkLoadPartitioner(startKeys)
val regionSplitPartitioner =
new BulkLoadPartitioner(startKeys)
//This is where all the magic happens
//Here we are going to do the following things
// 1. FlapMap every row in the RDD into key column value tuples
// 2. Then we are going to repartition sort and shuffle
// 3. Finally we are going to write out our HFiles
rdd.flatMap( r => flatMap(r)).
repartitionAndSortWithinPartitions(regionSplitPartitioner).
hbaseForeachPartition(this, (it, conn) => {
//This is where all the magic happens
//Here we are going to do the following things
// 1. FlapMap every row in the RDD into key column value tuples
// 2. Then we are going to repartition sort and shuffle
// 3. Finally we are going to write out our HFiles
rdd.flatMap( r => flatMap(r)).
repartitionAndSortWithinPartitions(regionSplitPartitioner).
hbaseForeachPartition(this, (it, conn) => {
val conf = broadcastedConf.value.value
val fs = FileSystem.get(conf)
val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
var rollOverRequested = false
val localTableName = TableName.valueOf(tableRawName)
val conf = broadcastedConf.value.value
val fs = FileSystem.get(conf)
val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
var rollOverRequested = false
val localTableName = TableName.valueOf(tableRawName)
//Here is where we finally iterate through the data in this partition of the
//RDD that has been sorted and partitioned
it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
//Here is where we finally iterate through the data in this partition of the
//RDD that has been sorted and partitioned
it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
keyFamilyQualifier.family,
keyFamilyQualifier.qualifier,
cellValue,
nowTimeStamp,
fs,
conn,
localTableName,
conf,
familyHFileWriteOptionsMapInternal,
hfileCompression,
writerMap,
stagingDir)
val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
keyFamilyQualifier.family,
keyFamilyQualifier.qualifier,
cellValue,
nowTimeStamp,
fs,
conn,
localTableName,
conf,
familyHFileWriteOptionsMapInternal,
hfileCompression,
writerMap,
stagingDir)
rollOverRequested = rollOverRequested || wl.written > maxSize
rollOverRequested = rollOverRequested || wl.written > maxSize
//This will only roll if we have at least one column family file that is
//bigger then maxSize and we have finished a given row key
if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
//This will only roll if we have at least one column family file that is
//bigger then maxSize and we have finished a given row key
if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
}
previousRow = keyFamilyQualifier.rowKey
}
//We have finished all the data so lets close up the writers
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
}
previousRow = keyFamilyQualifier.rowKey
}
//We have finished all the data so lets close up the writers
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
})
if(null != conn) conn.close()
})
} finally {
if(null != conn) conn.close()
}
}
/**
@ -760,118 +763,121 @@ class HBaseContext(@transient val sc: SparkContext,
throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
}
val conn = HBaseConnectionCache.getConnection(config)
val regionLocator = conn.getRegionLocator(tableName)
val startKeys = regionLocator.getStartKeys
if (startKeys.length == 0) {
logInfo("Table " + tableName.toString + " was not found")
}
val defaultCompressionStr = config.get("hfile.compression",
Compression.Algorithm.NONE.getName)
val defaultCompression = HFileWriterImpl
.compressionByName(defaultCompressionStr)
val nowTimeStamp = System.currentTimeMillis()
val tableRawName = tableName.getName
try {
val regionLocator = conn.getRegionLocator(tableName)
val startKeys = regionLocator.getStartKeys
if (startKeys.length == 0) {
logInfo("Table " + tableName.toString + " was not found")
}
val defaultCompressionStr = config.get("hfile.compression",
Compression.Algorithm.NONE.getName)
val defaultCompression = HFileWriterImpl
.compressionByName(defaultCompressionStr)
val nowTimeStamp = System.currentTimeMillis()
val tableRawName = tableName.getName
val familyHFileWriteOptionsMapInternal =
new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
val familyHFileWriteOptionsMapInternal =
new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
while (entrySetIt.hasNext) {
val entry = entrySetIt.next()
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
}
val regionSplitPartitioner =
new BulkLoadPartitioner(startKeys)
//This is where all the magic happens
//Here we are going to do the following things
// 1. FlapMap every row in the RDD into key column value tuples
// 2. Then we are going to repartition sort and shuffle
// 3. Finally we are going to write out our HFiles
rdd.map( r => mapFunction(r)).
repartitionAndSortWithinPartitions(regionSplitPartitioner).
hbaseForeachPartition(this, (it, conn) => {
val conf = broadcastedConf.value.value
val fs = FileSystem.get(conf)
val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
var rollOverRequested = false
val localTableName = TableName.valueOf(tableRawName)
//Here is where we finally iterate through the data in this partition of the
//RDD that has been sorted and partitioned
it.foreach{ case (rowKey:ByteArrayWrapper,
familiesQualifiersValues:FamiliesQualifiersValues) =>
if (Bytes.compareTo(previousRow, rowKey.value) == 0) {
throw new KeyAlreadyExistsException("The following key was sent to the " +
"HFile load more then one: " + Bytes.toString(previousRow))
}
//The family map is a tree map so the families will be sorted
val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator()
while (familyIt.hasNext) {
val familyEntry = familyIt.next()
val family = familyEntry.getKey.value
val qualifierIt = familyEntry.getValue.entrySet().iterator()
//The qualifier map is a tree map so the families will be sorted
while (qualifierIt.hasNext) {
val qualifierEntry = qualifierIt.next()
val qualifier = qualifierEntry.getKey
val cellValue = qualifierEntry.getValue
writeValueToHFile(rowKey.value,
family,
qualifier.value, // qualifier
cellValue, // value
nowTimeStamp,
fs,
conn,
localTableName,
conf,
familyHFileWriteOptionsMapInternal,
defaultCompression,
writerMap,
stagingDir)
previousRow = rowKey.value
}
writerMap.values.foreach( wl => {
rollOverRequested = rollOverRequested || wl.written > maxSize
//This will only roll if we have at least one column family file that is
//bigger then maxSize and we have finished a given row key
if (rollOverRequested) {
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
}
})
}
while (entrySetIt.hasNext) {
val entry = entrySetIt.next()
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
}
//This will get a writer for the column family
//If there is no writer for a given column family then
//it will get created here.
//We have finished all the data so lets close up the writers
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
})
if(null != conn) conn.close()
val regionSplitPartitioner =
new BulkLoadPartitioner(startKeys)
//This is where all the magic happens
//Here we are going to do the following things
// 1. FlapMap every row in the RDD into key column value tuples
// 2. Then we are going to repartition sort and shuffle
// 3. Finally we are going to write out our HFiles
rdd.map( r => mapFunction(r)).
repartitionAndSortWithinPartitions(regionSplitPartitioner).
hbaseForeachPartition(this, (it, conn) => {
val conf = broadcastedConf.value.value
val fs = FileSystem.get(conf)
val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
var rollOverRequested = false
val localTableName = TableName.valueOf(tableRawName)
//Here is where we finally iterate through the data in this partition of the
//RDD that has been sorted and partitioned
it.foreach{ case (rowKey:ByteArrayWrapper,
familiesQualifiersValues:FamiliesQualifiersValues) =>
if (Bytes.compareTo(previousRow, rowKey.value) == 0) {
throw new KeyAlreadyExistsException("The following key was sent to the " +
"HFile load more then one: " + Bytes.toString(previousRow))
}
//The family map is a tree map so the families will be sorted
val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator()
while (familyIt.hasNext) {
val familyEntry = familyIt.next()
val family = familyEntry.getKey.value
val qualifierIt = familyEntry.getValue.entrySet().iterator()
//The qualifier map is a tree map so the families will be sorted
while (qualifierIt.hasNext) {
val qualifierEntry = qualifierIt.next()
val qualifier = qualifierEntry.getKey
val cellValue = qualifierEntry.getValue
writeValueToHFile(rowKey.value,
family,
qualifier.value, // qualifier
cellValue, // value
nowTimeStamp,
fs,
conn,
localTableName,
conf,
familyHFileWriteOptionsMapInternal,
defaultCompression,
writerMap,
stagingDir)
previousRow = rowKey.value
}
writerMap.values.foreach( wl => {
rollOverRequested = rollOverRequested || wl.written > maxSize
//This will only roll if we have at least one column family file that is
//bigger then maxSize and we have finished a given row key
if (rollOverRequested) {
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
}
})
}
}
//This will get a writer for the column family
//If there is no writer for a given column family then
//it will get created here.
//We have finished all the data so lets close up the writers
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
})
} finally {
if(null != conn) conn.close()
}
}
/**