From f4f2b68238a094d7b1931dc8b7939742ccbb2b57 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 17 Apr 2018 19:45:53 -0700 Subject: [PATCH] HBASE-20421 HBasecontext creates a connection but does not close it (Yu Wang) --- .../hadoop/hbase/spark/HBaseContext.scala | 352 +++++++++--------- 1 file changed, 179 insertions(+), 173 deletions(-) diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index 656b8c2be01..e50a3e8a921 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -628,87 +628,90 @@ class HBaseContext(@transient val sc: SparkContext, throw new FileAlreadyExistsException("Path " + stagingDir + " already exists") } val conn = HBaseConnectionCache.getConnection(config) - val regionLocator = conn.getRegionLocator(tableName) - val startKeys = regionLocator.getStartKeys - if (startKeys.length == 0) { - logInfo("Table " + tableName.toString + " was not found") - } - val defaultCompressionStr = config.get("hfile.compression", - Compression.Algorithm.NONE.getName) - val hfileCompression = HFileWriterImpl - .compressionByName(defaultCompressionStr) - val nowTimeStamp = System.currentTimeMillis() - val tableRawName = tableName.getName + try { + val regionLocator = conn.getRegionLocator(tableName) + val startKeys = regionLocator.getStartKeys + if (startKeys.length == 0) { + logInfo("Table " + tableName.toString + " was not found") + } + val defaultCompressionStr = config.get("hfile.compression", + Compression.Algorithm.NONE.getName) + val hfileCompression = HFileWriterImpl + .compressionByName(defaultCompressionStr) + val nowTimeStamp = System.currentTimeMillis() + val tableRawName = tableName.getName - val familyHFileWriteOptionsMapInternal = - new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions] + val familyHFileWriteOptionsMapInternal = + new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions] - val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator() + val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator() - while (entrySetIt.hasNext) { - val entry = entrySetIt.next() - familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue) - } + while (entrySetIt.hasNext) { + val entry = entrySetIt.next() + familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue) + } - val regionSplitPartitioner = - new BulkLoadPartitioner(startKeys) + val regionSplitPartitioner = + new BulkLoadPartitioner(startKeys) - //This is where all the magic happens - //Here we are going to do the following things - // 1. FlapMap every row in the RDD into key column value tuples - // 2. Then we are going to repartition sort and shuffle - // 3. Finally we are going to write out our HFiles - rdd.flatMap( r => flatMap(r)). - repartitionAndSortWithinPartitions(regionSplitPartitioner). - hbaseForeachPartition(this, (it, conn) => { + //This is where all the magic happens + //Here we are going to do the following things + // 1. FlapMap every row in the RDD into key column value tuples + // 2. Then we are going to repartition sort and shuffle + // 3. Finally we are going to write out our HFiles + rdd.flatMap( r => flatMap(r)). + repartitionAndSortWithinPartitions(regionSplitPartitioner). + hbaseForeachPartition(this, (it, conn) => { - val conf = broadcastedConf.value.value - val fs = FileSystem.get(conf) - val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength] - var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY - var rollOverRequested = false - val localTableName = TableName.valueOf(tableRawName) + val conf = broadcastedConf.value.value + val fs = FileSystem.get(conf) + val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength] + var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY + var rollOverRequested = false + val localTableName = TableName.valueOf(tableRawName) - //Here is where we finally iterate through the data in this partition of the - //RDD that has been sorted and partitioned - it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) => + //Here is where we finally iterate through the data in this partition of the + //RDD that has been sorted and partitioned + it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) => - val wl = writeValueToHFile(keyFamilyQualifier.rowKey, - keyFamilyQualifier.family, - keyFamilyQualifier.qualifier, - cellValue, - nowTimeStamp, - fs, - conn, - localTableName, - conf, - familyHFileWriteOptionsMapInternal, - hfileCompression, - writerMap, - stagingDir) + val wl = writeValueToHFile(keyFamilyQualifier.rowKey, + keyFamilyQualifier.family, + keyFamilyQualifier.qualifier, + cellValue, + nowTimeStamp, + fs, + conn, + localTableName, + conf, + familyHFileWriteOptionsMapInternal, + hfileCompression, + writerMap, + stagingDir) - rollOverRequested = rollOverRequested || wl.written > maxSize + rollOverRequested = rollOverRequested || wl.written > maxSize - //This will only roll if we have at least one column family file that is - //bigger then maxSize and we have finished a given row key - if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) { + //This will only roll if we have at least one column family file that is + //bigger then maxSize and we have finished a given row key + if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) { + rollWriters(fs, writerMap, + regionSplitPartitioner, + previousRow, + compactionExclude) + rollOverRequested = false + } + + previousRow = keyFamilyQualifier.rowKey + } + //We have finished all the data so lets close up the writers rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude) rollOverRequested = false - } - - previousRow = keyFamilyQualifier.rowKey - } - //We have finished all the data so lets close up the writers - rollWriters(fs, writerMap, - regionSplitPartitioner, - previousRow, - compactionExclude) - rollOverRequested = false - }) - if(null != conn) conn.close() + }) + } finally { + if(null != conn) conn.close() + } } /** @@ -760,118 +763,121 @@ class HBaseContext(@transient val sc: SparkContext, throw new FileAlreadyExistsException("Path " + stagingDir + " already exists") } val conn = HBaseConnectionCache.getConnection(config) - val regionLocator = conn.getRegionLocator(tableName) - val startKeys = regionLocator.getStartKeys - if (startKeys.length == 0) { - logInfo("Table " + tableName.toString + " was not found") - } - val defaultCompressionStr = config.get("hfile.compression", - Compression.Algorithm.NONE.getName) - val defaultCompression = HFileWriterImpl - .compressionByName(defaultCompressionStr) - val nowTimeStamp = System.currentTimeMillis() - val tableRawName = tableName.getName + try { + val regionLocator = conn.getRegionLocator(tableName) + val startKeys = regionLocator.getStartKeys + if (startKeys.length == 0) { + logInfo("Table " + tableName.toString + " was not found") + } + val defaultCompressionStr = config.get("hfile.compression", + Compression.Algorithm.NONE.getName) + val defaultCompression = HFileWriterImpl + .compressionByName(defaultCompressionStr) + val nowTimeStamp = System.currentTimeMillis() + val tableRawName = tableName.getName - val familyHFileWriteOptionsMapInternal = - new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions] + val familyHFileWriteOptionsMapInternal = + new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions] - val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator() + val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator() - while (entrySetIt.hasNext) { - val entry = entrySetIt.next() - familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue) - } - - val regionSplitPartitioner = - new BulkLoadPartitioner(startKeys) - - //This is where all the magic happens - //Here we are going to do the following things - // 1. FlapMap every row in the RDD into key column value tuples - // 2. Then we are going to repartition sort and shuffle - // 3. Finally we are going to write out our HFiles - rdd.map( r => mapFunction(r)). - repartitionAndSortWithinPartitions(regionSplitPartitioner). - hbaseForeachPartition(this, (it, conn) => { - - val conf = broadcastedConf.value.value - val fs = FileSystem.get(conf) - val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength] - var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY - var rollOverRequested = false - val localTableName = TableName.valueOf(tableRawName) - - //Here is where we finally iterate through the data in this partition of the - //RDD that has been sorted and partitioned - it.foreach{ case (rowKey:ByteArrayWrapper, - familiesQualifiersValues:FamiliesQualifiersValues) => - - - if (Bytes.compareTo(previousRow, rowKey.value) == 0) { - throw new KeyAlreadyExistsException("The following key was sent to the " + - "HFile load more then one: " + Bytes.toString(previousRow)) - } - - //The family map is a tree map so the families will be sorted - val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator() - while (familyIt.hasNext) { - val familyEntry = familyIt.next() - - val family = familyEntry.getKey.value - - val qualifierIt = familyEntry.getValue.entrySet().iterator() - - //The qualifier map is a tree map so the families will be sorted - while (qualifierIt.hasNext) { - - val qualifierEntry = qualifierIt.next() - val qualifier = qualifierEntry.getKey - val cellValue = qualifierEntry.getValue - - writeValueToHFile(rowKey.value, - family, - qualifier.value, // qualifier - cellValue, // value - nowTimeStamp, - fs, - conn, - localTableName, - conf, - familyHFileWriteOptionsMapInternal, - defaultCompression, - writerMap, - stagingDir) - - previousRow = rowKey.value - } - - writerMap.values.foreach( wl => { - rollOverRequested = rollOverRequested || wl.written > maxSize - - //This will only roll if we have at least one column family file that is - //bigger then maxSize and we have finished a given row key - if (rollOverRequested) { - rollWriters(fs, writerMap, - regionSplitPartitioner, - previousRow, - compactionExclude) - rollOverRequested = false - } - }) - } + while (entrySetIt.hasNext) { + val entry = entrySetIt.next() + familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue) } - //This will get a writer for the column family - //If there is no writer for a given column family then - //it will get created here. - //We have finished all the data so lets close up the writers - rollWriters(fs, writerMap, - regionSplitPartitioner, - previousRow, - compactionExclude) - rollOverRequested = false - }) - if(null != conn) conn.close() + val regionSplitPartitioner = + new BulkLoadPartitioner(startKeys) + + //This is where all the magic happens + //Here we are going to do the following things + // 1. FlapMap every row in the RDD into key column value tuples + // 2. Then we are going to repartition sort and shuffle + // 3. Finally we are going to write out our HFiles + rdd.map( r => mapFunction(r)). + repartitionAndSortWithinPartitions(regionSplitPartitioner). + hbaseForeachPartition(this, (it, conn) => { + + val conf = broadcastedConf.value.value + val fs = FileSystem.get(conf) + val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength] + var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY + var rollOverRequested = false + val localTableName = TableName.valueOf(tableRawName) + + //Here is where we finally iterate through the data in this partition of the + //RDD that has been sorted and partitioned + it.foreach{ case (rowKey:ByteArrayWrapper, + familiesQualifiersValues:FamiliesQualifiersValues) => + + + if (Bytes.compareTo(previousRow, rowKey.value) == 0) { + throw new KeyAlreadyExistsException("The following key was sent to the " + + "HFile load more then one: " + Bytes.toString(previousRow)) + } + + //The family map is a tree map so the families will be sorted + val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator() + while (familyIt.hasNext) { + val familyEntry = familyIt.next() + + val family = familyEntry.getKey.value + + val qualifierIt = familyEntry.getValue.entrySet().iterator() + + //The qualifier map is a tree map so the families will be sorted + while (qualifierIt.hasNext) { + + val qualifierEntry = qualifierIt.next() + val qualifier = qualifierEntry.getKey + val cellValue = qualifierEntry.getValue + + writeValueToHFile(rowKey.value, + family, + qualifier.value, // qualifier + cellValue, // value + nowTimeStamp, + fs, + conn, + localTableName, + conf, + familyHFileWriteOptionsMapInternal, + defaultCompression, + writerMap, + stagingDir) + + previousRow = rowKey.value + } + + writerMap.values.foreach( wl => { + rollOverRequested = rollOverRequested || wl.written > maxSize + + //This will only roll if we have at least one column family file that is + //bigger then maxSize and we have finished a given row key + if (rollOverRequested) { + rollWriters(fs, writerMap, + regionSplitPartitioner, + previousRow, + compactionExclude) + rollOverRequested = false + } + }) + } + } + + //This will get a writer for the column family + //If there is no writer for a given column family then + //it will get created here. + //We have finished all the data so lets close up the writers + rollWriters(fs, writerMap, + regionSplitPartitioner, + previousRow, + compactionExclude) + rollOverRequested = false + }) + } finally { + if(null != conn) conn.close() + } } /**