Avoid NullPointerExceptions that can occur during leveldb replication M/S state transitions.

This commit is contained in:
Hiram Chirino 2013-10-15 11:45:45 -04:00
parent ee65ca4ee5
commit ef45a5f8ff
5 changed files with 48 additions and 10 deletions

View File

@ -929,7 +929,7 @@ class LevelDBClient(store: LevelDBStore) {
var wal_append_position = 0L
def stop() = {
def stop() = this.synchronized {
if( writeExecutor!=null ) {
writeExecutor.shutdown
writeExecutor.awaitTermination(60, TimeUnit.SECONDS)
@ -945,7 +945,7 @@ class LevelDBClient(store: LevelDBStore) {
index.close
index = null
}
if (log.isOpen) {
if (log!=null && log.isOpen) {
log.close
copyDirtyIndexToSnapshot
wal_append_position = log.appender_limit

View File

@ -184,6 +184,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]()
def check_running = {
if( this.isStopped ) {
throw new IOException("Store has been stopped")
}
}
def init() = {}
def createDefaultLocker() = {
@ -664,6 +670,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
lastSeq.set(db.getLastQueueEntrySeq(key))
def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
check_running
val seq = lastSeq.incrementAndGet()
message.incrementReferenceCount()
uow.addCompleteListener({
@ -674,6 +681,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false)
override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {
check_running
message.getMessageId.setEntryLocator(null)
if( message.getTransactionId!=null ) {
transaction(message.getTransactionId).add(this, message, delay)
@ -687,6 +695,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
override def addMessage(context: ConnectionContext, message: Message) = addMessage(context, message, false)
override def addMessage(context: ConnectionContext, message: Message, delay: Boolean): Unit = {
check_running
waitOn(asyncAddQueueMessage(context, message, delay))
}
@ -695,6 +704,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
override def removeAsyncMessage(context: ConnectionContext, ack: MessageAck): Unit = {
check_running
if( ack.getTransactionId!=null ) {
transaction(ack.getTransactionId).remove(this, ack)
} else {
@ -705,10 +715,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def removeMessage(context: ConnectionContext, ack: MessageAck): Unit = {
check_running
removeAsyncMessage(context, ack)
}
def getMessage(id: MessageId): Message = {
check_running
var message: Message = db.getMessage(id)
if (message == null) {
throw new IOException("Message id not found: " + id)
@ -717,6 +729,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def removeAllMessages(context: ConnectionContext): Unit = {
check_running
db.collectionEmpty(key)
cursorPosition = 0
}
@ -730,6 +743,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def recover(listener: MessageRecoveryListener): Unit = {
check_running
cursorPosition = db.cursorMessages(preparedAcks, key, listener, 0)
}
@ -738,6 +752,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
check_running
cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorPosition, maxReturned)
}
@ -802,6 +817,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def addSubsciption(info: SubscriptionInfo, retroactive: Boolean) = {
check_running
var sub = db.addSubscription(key, info)
subscriptions.synchronized {
subscriptions.put((info.getClientId, info.getSubcriptionName), sub)
@ -815,14 +831,17 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def getAllSubscriptions: Array[SubscriptionInfo] = subscriptions.synchronized {
check_running
subscriptions.values.map(_.info).toArray
}
def lookupSubscription(clientId: String, subscriptionName: String): SubscriptionInfo = subscriptions.synchronized {
check_running
subscriptions.get((clientId, subscriptionName)).map(_.info).getOrElse(null)
}
def deleteSubscription(clientId: String, subscriptionName: String): Unit = {
check_running
subscriptions.synchronized {
subscriptions.remove((clientId, subscriptionName))
}.foreach(db.removeSubscription(_))
@ -839,6 +858,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def acknowledge(context: ConnectionContext, clientId: String, subscriptionName: String, messageId: MessageId, ack: MessageAck): Unit = {
check_running
lookup(clientId, subscriptionName).foreach { sub =>
var position = db.queuePosition(messageId)
if( ack.getTransactionId!=null ) {
@ -855,23 +875,27 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def resetBatching(clientId: String, subscriptionName: String): Unit = {
check_running
lookup(clientId, subscriptionName).foreach { sub =>
sub.cursorPosition = 0
}
}
def recoverSubscription(clientId: String, subscriptionName: String, listener: MessageRecoveryListener): Unit = {
check_running
lookup(clientId, subscriptionName).foreach { sub =>
sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1))
}
}
def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = {
check_running
lookup(clientId, subscriptionName).foreach { sub =>
sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1), maxReturned)
}
}
def getMessageCount(clientId: String, subscriptionName: String): Int = {
check_running
lookup(clientId, subscriptionName) match {
case Some(sub) =>
(lastSeq.get - sub.lastAckPosition).toInt
@ -889,10 +913,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def getName: String = name
def destroy() = {
check_running
removePList(name)
}
def addFirst(id: String, bs: ByteSequence): AnyRef = {
check_running
var pos = lastSeq.decrementAndGet()
add(pos, id, bs)
listSize.incrementAndGet()
@ -900,6 +926,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def addLast(id: String, bs: ByteSequence): AnyRef = {
check_running
var pos = lastSeq.incrementAndGet()
add(pos, id, bs)
listSize.incrementAndGet()
@ -907,6 +934,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def add(pos:Long, id: String, bs: ByteSequence) = {
check_running
val encoded_key = encodeLongLong(key, pos)
val encoded_id = new UTF8Buffer(id)
val os = new DataByteArrayOutputStream(2+encoded_id.length+bs.length)
@ -917,6 +945,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def remove(position: AnyRef): Boolean = {
check_running
val pos = position.asInstanceOf[java.lang.Long].longValue()
val encoded_key = encodeLongLong(key, pos)
db.plistGet(encoded_key) match {
@ -933,6 +962,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def size(): Long = listSize.get()
def iterator() = new PListIterator() {
check_running
val prefix = LevelDBClient.encodeLong(key)
var dbi = db.plistIterator
var last_key:Array[Byte] = _

View File

@ -137,7 +137,8 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
def onAccept(transport: Transport) {
transport.setDispatchQueue(createQueue("connection from "+transport.getRemoteAddress))
transport.setBlockingExecutor(blocking_executor)
new Session(transport)
new Session(transport).start
}
def onAcceptError(error: Exception) {
warn(error)

View File

@ -107,6 +107,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
session.handler = wal_handler(session)
}
})
wal_session.start
}
def stop_connections(cb:Task) = {
@ -114,18 +115,20 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
unstash(directory)
cb.run()
}
if( wal_session !=null ) {
val next = then
then = ^{
wal_session.transport.stop(next)
val wal_session_copy = wal_session
if( wal_session_copy !=null ) {
wal_session = null
}
}
if( transfer_session !=null ) {
val next = then
then = ^{
transfer_session.transport.stop(next)
wal_session_copy.transport.stop(next)
}
}
val transfer_session_copy = transfer_session
if( transfer_session_copy !=null ) {
transfer_session = null
val next = then
then = ^{
transfer_session_copy.transport.stop(next)
}
}
then.run();
@ -414,6 +417,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
pending_log_removes.clear()
}
})
transfer_session.start
state.snapshot_position
}

View File

@ -33,7 +33,10 @@ abstract class TransportHandler(val transport: Transport) extends TransportListe
transport.setProtocolCodec(codec)
transport.setTransportListener(this)
def start = {
transport.start(NOOP)
}
def onTransportConnected = transport.resumeRead()
def onTransportDisconnected() = {}