HBASE-17047 Add an API to get HBase connection cache statistics (Weiqing Yang)

This commit is contained in:
tedyu 2016-11-11 06:50:01 -08:00
parent 8a6d6aa239
commit 81623a353c
2 changed files with 69 additions and 10 deletions

View File

@ -34,6 +34,8 @@ private[spark] object HBaseConnectionCache extends Logging {
// A hashmap of Spark-HBase connections. Key is HBaseConnectionKey. // A hashmap of Spark-HBase connections. Key is HBaseConnectionKey.
val connectionMap = new mutable.HashMap[HBaseConnectionKey, SmartConnection]() val connectionMap = new mutable.HashMap[HBaseConnectionKey, SmartConnection]()
val cacheStat = HBaseConnectionCacheStat(0, 0, 0)
// in milliseconds // in milliseconds
private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.connectionCloseDelay private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.connectionCloseDelay
private var timeout = DEFAULT_TIME_OUT private var timeout = DEFAULT_TIME_OUT
@ -58,6 +60,13 @@ private[spark] object HBaseConnectionCache extends Logging {
housekeepingThread.setDaemon(true) housekeepingThread.setDaemon(true)
housekeepingThread.start() housekeepingThread.start()
def getStat: HBaseConnectionCacheStat = {
connectionMap.synchronized {
cacheStat.numActiveConnections = connectionMap.size
cacheStat.copy()
}
}
def close(): Unit = { def close(): Unit = {
try { try {
connectionMap.synchronized { connectionMap.synchronized {
@ -100,7 +109,9 @@ private[spark] object HBaseConnectionCache extends Logging {
connectionMap.synchronized { connectionMap.synchronized {
if (closed) if (closed)
return null return null
val sc = connectionMap.getOrElseUpdate(key, new SmartConnection(conn)) cacheStat.numTotalRequests += 1
val sc = connectionMap.getOrElseUpdate(key, {cacheStat.numActualConnectionsCreated += 1
new SmartConnection(conn)})
sc.refCount += 1 sc.refCount += 1
sc sc
} }
@ -240,4 +251,15 @@ class HBaseConnectionKey(c: Configuration) extends Logging {
} }
} }
/**
* To log the state of 'HBaseConnectionCache'
*
* @param numTotalRequests number of total connection requests to the cache
* @param numActualConnectionsCreated number of actual HBase connections the cache ever created
* @param numActiveConnections number of current alive HBase connections the cache is holding
*/
case class HBaseConnectionCacheStat(var numTotalRequests: Long,
var numActualConnectionsCreated: Long,
var numActiveConnections: Long)

View File

@ -76,11 +76,18 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
testWithPressureWithClose() testWithPressureWithClose()
} }
def testBasic() { def cleanEnv() {
HBaseConnectionCache.setTimeout(1 * 1000)
HBaseConnectionCache.connectionMap.synchronized { HBaseConnectionCache.connectionMap.synchronized {
HBaseConnectionCache.connectionMap.clear() HBaseConnectionCache.connectionMap.clear()
HBaseConnectionCache.cacheStat.numActiveConnections = 0
HBaseConnectionCache.cacheStat.numActualConnectionsCreated = 0
HBaseConnectionCache.cacheStat.numTotalRequests = 0
} }
}
def testBasic() {
cleanEnv()
HBaseConnectionCache.setTimeout(1 * 1000)
val connKeyMocker1 = new HBaseConnectionKeyMocker(1) val connKeyMocker1 = new HBaseConnectionKeyMocker(1)
val connKeyMocker1a = new HBaseConnectionKeyMocker(1) val connKeyMocker1a = new HBaseConnectionKeyMocker(1)
@ -88,11 +95,20 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
val c1 = HBaseConnectionCache val c1 = HBaseConnectionCache
.getConnection(connKeyMocker1, new ConnectionMocker) .getConnection(connKeyMocker1, new ConnectionMocker)
assert(HBaseConnectionCache.connectionMap.size === 1)
assert(HBaseConnectionCache.getStat.numTotalRequests === 1)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 1)
assert(HBaseConnectionCache.getStat.numActiveConnections === 1)
val c1a = HBaseConnectionCache val c1a = HBaseConnectionCache
.getConnection(connKeyMocker1a, new ConnectionMocker) .getConnection(connKeyMocker1a, new ConnectionMocker)
HBaseConnectionCache.connectionMap.synchronized { HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 1) assert(HBaseConnectionCache.connectionMap.size === 1)
assert(HBaseConnectionCache.getStat.numTotalRequests === 2)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 1)
assert(HBaseConnectionCache.getStat.numActiveConnections === 1)
} }
val c2 = HBaseConnectionCache val c2 = HBaseConnectionCache
@ -100,16 +116,21 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
HBaseConnectionCache.connectionMap.synchronized { HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 2) assert(HBaseConnectionCache.connectionMap.size === 2)
assert(HBaseConnectionCache.getStat.numTotalRequests === 3)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 2)
assert(HBaseConnectionCache.getStat.numActiveConnections === 2)
} }
c1.close() c1.close()
HBaseConnectionCache.connectionMap.synchronized { HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 2) assert(HBaseConnectionCache.connectionMap.size === 2)
assert(HBaseConnectionCache.getStat.numActiveConnections === 2)
} }
c1a.close() c1a.close()
HBaseConnectionCache.connectionMap.synchronized { HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 2) assert(HBaseConnectionCache.connectionMap.size === 2)
assert(HBaseConnectionCache.getStat.numActiveConnections === 2)
} }
Thread.sleep(3 * 1000) // Leave housekeeping thread enough time Thread.sleep(3 * 1000) // Leave housekeeping thread enough time
@ -117,12 +138,15 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
assert(HBaseConnectionCache.connectionMap.size === 1) assert(HBaseConnectionCache.connectionMap.size === 1)
assert(HBaseConnectionCache.connectionMap.iterator.next()._1 assert(HBaseConnectionCache.connectionMap.iterator.next()._1
.asInstanceOf[HBaseConnectionKeyMocker].confId === 2) .asInstanceOf[HBaseConnectionKeyMocker].confId === 2)
assert(HBaseConnectionCache.getStat.numActiveConnections === 1)
} }
c2.close() c2.close()
} }
def testWithPressureWithoutClose() { def testWithPressureWithoutClose() {
cleanEnv()
class TestThread extends Runnable { class TestThread extends Runnable {
override def run() { override def run() {
for (i <- 0 to 999) { for (i <- 0 to 999) {
@ -147,6 +171,10 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
Thread.sleep(1000) Thread.sleep(1000)
HBaseConnectionCache.connectionMap.synchronized { HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 10) assert(HBaseConnectionCache.connectionMap.size === 10)
assert(HBaseConnectionCache.getStat.numTotalRequests === 100 * 1000)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
assert(HBaseConnectionCache.getStat.numActiveConnections === 10)
var totalRc : Int = 0 var totalRc : Int = 0
HBaseConnectionCache.connectionMap.foreach { HBaseConnectionCache.connectionMap.foreach {
x => totalRc += x._2.refCount x => totalRc += x._2.refCount
@ -161,9 +189,13 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
} }
Thread.sleep(1000) Thread.sleep(1000)
assert(HBaseConnectionCache.connectionMap.size === 0) assert(HBaseConnectionCache.connectionMap.size === 0)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
assert(HBaseConnectionCache.getStat.numActiveConnections === 0)
} }
def testWithPressureWithClose() { def testWithPressureWithClose() {
cleanEnv()
class TestThread extends Runnable { class TestThread extends Runnable {
override def run() { override def run() {
for (i <- 0 to 999) { for (i <- 0 to 999) {
@ -189,11 +221,16 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
HBaseConnectionCache.connectionMap.synchronized { HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 10) assert(HBaseConnectionCache.connectionMap.size === 10)
assert(HBaseConnectionCache.getStat.numTotalRequests === 100 * 1000)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
assert(HBaseConnectionCache.getStat.numActiveConnections === 10)
} }
Thread.sleep(6 * 1000) Thread.sleep(6 * 1000)
HBaseConnectionCache.connectionMap.synchronized { HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 0) assert(HBaseConnectionCache.connectionMap.size === 0)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
assert(HBaseConnectionCache.getStat.numActiveConnections === 0)
} }
} }
} }