Fixing https://issues.apache.org/jira/browse/AMQ-4917 : LevelDB store can fail when using durable subs.

We were browsing durable sub entries which had been concurrently GCed causing leveldb store failures which then caused the broker to restart.
This commit is contained in:
Hiram Chirino 2013-12-03 19:20:11 -05:00
parent 69e35d6c4a
commit 8378cb1ffc
4 changed files with 103 additions and 95 deletions

View File

@ -37,7 +37,9 @@ import org.apache.activemq.ActiveMQMessageAuditNoSync
import org.fusesource.hawtdispatch
case class EntryLocator(qid:Long, seq:Long)
case class DataLocator(store:LevelDBStore, pos:Long, len:Int)
case class DataLocator(store:LevelDBStore, pos:Long, len:Int) {
override def toString: String = "DataLocator(%x, %d)".format(pos, len)
}
case class MessageRecord(store:LevelDBStore, id:MessageId, data:Buffer, syncNeeded:Boolean) {
var locator:DataLocator = _
}
@ -860,8 +862,12 @@ class DBManager(val parent:LevelDBStore) {
def getMessage(x: MessageId):Message = {
val id = Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x)
val locator = id.getDataLocator()
val msg = client.getMessageWithRetry(locator)
msg.setMessageId(id)
val msg = client.getMessage(locator)
if( msg!=null ) {
msg.setMessageId(id)
} else {
LevelDBStore.warn("Could not load messages for: "+x+" at: "+locator)
}
msg
}

View File

@ -21,7 +21,6 @@ import java.{lang=>jl}
import java.{util=>ju}
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.concurrent.atomic.AtomicBoolean
import collection.immutable.TreeMap
import collection.mutable.{HashMap, ListBuffer}
import org.iq80.leveldb._
@ -37,20 +36,13 @@ import org.apache.activemq.command.{MessageAck, Message}
import org.apache.activemq.util.{IOExceptionSupport, ByteSequence}
import java.text.SimpleDateFormat
import java.util.{Date, Collections}
import org.apache.activemq.leveldb.util.TimeMetric
import org.apache.activemq.leveldb.RecordLog.LogInfo
import org.fusesource.leveldbjni.internal.JniDB
import org.apache.activemq.ActiveMQMessageAuditNoSync
import java.util.zip.CRC32
import org.apache.activemq.leveldb.util.TimeMetric
import org.fusesource.hawtbuf.ByteArrayInputStream
import org.apache.activemq.leveldb.RecordLog.LogInfo
import scala.Some
import scala.Serializable
import org.apache.activemq.leveldb.XaAckRecord
import org.apache.activemq.leveldb.MessageRecord
import org.apache.activemq.leveldb.EntryLocator
import org.apache.activemq.leveldb.DataLocator
import org.fusesource.hawtbuf.ByteArrayOutputStream
import org.apache.activemq.broker.SuppressReplyException
@ -772,19 +764,20 @@ class LevelDBClient(store: LevelDBStore) {
val index_record = new EntryRecord.Bean()
index_record.setValueLocation(record.getValueLocation)
index_record.setValueLength(record.getValueLength)
val index_value = encodeEntryRecord(index_record.freeze()).toByteArray
if( record.hasValueLength ) {
index_record.setValueLength(record.getValueLength)
}
val index_value = encodeEntryRecord(index_record.freeze()).toByteArray
replay_write_batch.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value)
if( kind==LOG_ADD_ENTRY ) {
if ( record.hasValueLocation ) {
logRefIncrement(record.getValueLocation)
}
logRefIncrement(record.getValueLocation)
collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray)
trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", indexRecoveryPosition, record.getCollectionKey, record.getEntryKey)
} else {
trace("Replay of LOG_UPDATE_ENTRY at %d, collection: %s, entry: %s", indexRecoveryPosition, record.getCollectionKey, record.getEntryKey)
}
trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", indexRecoveryPosition, record.getCollectionKey, record.getEntryKey)
case LOG_REMOVE_ENTRY =>
val record = decodeEntryRecord(data)
@ -834,10 +827,9 @@ class LevelDBClient(store: LevelDBStore) {
private def logRefDecrement(pos: Long) {
for( key <- logRefKey(pos) ) {
logRefs.get(key).foreach { counter =>
if (counter.decrementAndGet() == 0) {
logRefs.remove(key)
}
logRefs.get(key) match {
case Some(counter) => counter.decrementAndGet() == 0
case None => warn("invalid: logRefDecrement: "+pos)
}
}
}
@ -1252,11 +1244,16 @@ class LevelDBClient(store: LevelDBStore) {
collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
val seq = decodeLong(key)
var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
val msg = getMessageWithRetry(locator)
msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
msg.getMessageId().setDataLocator(locator)
msg.setRedeliveryCounter(decodeQueueEntryMeta(value))
func(msg)
val msg = getMessage(locator)
if( msg !=null ) {
msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
msg.getMessageId().setDataLocator(locator)
msg.setRedeliveryCounter(decodeQueueEntryMeta(value))
func(msg)
} else {
warn("Could not load message seq: "+seq+" from "+locator)
true
}
}
}
@ -1278,10 +1275,15 @@ class LevelDBClient(store: LevelDBStore) {
func(XaAckRecord(collectionKey, seq, ack, sub))
} else {
var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
val msg = getMessageWithRetry(locator)
msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
msg.getMessageId().setDataLocator(locator)
func(msg)
val msg = getMessage(locator)
if( msg !=null ) {
msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
msg.getMessageId().setDataLocator(locator)
func(msg)
} else {
warn("Could not load XA message seq: "+seq+" from "+locator)
true
}
}
}
}
@ -1295,22 +1297,6 @@ class LevelDBClient(store: LevelDBStore) {
}
}
def getMessageWithRetry(locator:AnyRef):Message = {
var retry = 0
var rc = getMessage(locator);
while( rc == null ) {
if( retry > 10 )
return null;
Thread.sleep(retry*10)
rc = getMessage(locator);
retry+=1
}
if( retry > 0 ) {
info("Recovered from 'failed getMessage' on retry: "+retry)
}
rc
}
def getMessage(locator:AnyRef):Message = {
assert(locator!=null)
val buffer = locator match {
@ -1487,9 +1473,7 @@ class LevelDBClient(store: LevelDBStore) {
batch.put(key, index_data)
if( kind==LOG_ADD_ENTRY ) {
for (key <- logRefKey(dataLocator.pos, log_info)) {
logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
}
logRefIncrement(dataLocator.pos)
collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray)
}
@ -1537,11 +1521,11 @@ class LevelDBClient(store: LevelDBStore) {
log_record.setCollectionKey(entry.subKey)
log_record.setEntryKey(ACK_POSITION)
log_record.setValueLocation(entry.ackPosition)
appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
appender.append(LOG_UPDATE_ENTRY, encodeEntryRecord(log_record.freeze()))
val index_record = new EntryRecord.Bean()
index_record.setValueLocation(entry.ackPosition)
batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
batch.put(key, encodeEntryRecord(index_record.freeze()).toByteArray)
}
if (uow.syncNeeded) {
@ -1625,17 +1609,6 @@ class LevelDBClient(store: LevelDBStore) {
def gc(topicPositions:Seq[(Long, Long)]):Unit = {
detect_if_compact_needed
// Lets compact the leveldb index if it looks like we need to.
if( index.compact_needed ) {
debug("Compacting the leveldb index at: %s", dirtyIndexFile)
val start = System.nanoTime()
index.compact
val duration = System.nanoTime() - start;
info("Compacted the leveldb index at: %s in %.2f ms", dirtyIndexFile, (duration / 1000000.0))
}
// Delete message refs for topics who's consumers have advanced..
if( !topicPositions.isEmpty ) {
might_fail_using_index {
@ -1646,6 +1619,7 @@ class LevelDBClient(store: LevelDBStore) {
ro.verifyChecksums(verifyChecksums)
val start = encodeEntryKey(ENTRY_PREFIX, topic, 0)
val end = encodeEntryKey(ENTRY_PREFIX, topic, first)
debug("Topic: %d GC to seq: %d", topic, first)
index.cursorRange(start, end, ro) { case (key, value) =>
val entry = EntryRecord.FACTORY.parseUnframed(value)
batch.delete(key)
@ -1657,8 +1631,30 @@ class LevelDBClient(store: LevelDBStore) {
}
}
detect_if_compact_needed
// Lets compact the leveldb index if it looks like we need to.
if( index.compact_needed ) {
val start = System.nanoTime()
index.compact
val duration = System.nanoTime() - start;
info("Compacted the leveldb index at: %s in %.2f ms", dirtyIndexFile, (duration / 1000000.0))
}
import collection.JavaConversions._
lastIndexSnapshotPos
// drop the logs that are no longer referenced.
for( (x,y) <- logRefs.toSeq ) {
if( y.get() <= 0 ) {
if( y.get() < 0 ) {
warn("Found a negative log reference for log: "+x)
}
debug("Log no longer referenced: %x", x)
logRefs.remove(x)
}
}
val emptyJournals = log.log_infos.keySet.toSet -- logRefs.keySet
// We don't want to delete any journals that the index has not snapshot'ed or
@ -1669,6 +1665,7 @@ class LevelDBClient(store: LevelDBStore) {
emptyJournals.foreach { id =>
if ( id < deleteLimit ) {
debug("Deleting log at %x", id)
log.delete(id)
}
}

View File

@ -669,6 +669,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
lastSeq.set(db.getLastQueueEntrySeq(key))
def cursorResetPosition = 0L
def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
check_running
val seq = lastSeq.incrementAndGet()
@ -731,7 +733,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def removeAllMessages(context: ConnectionContext): Unit = {
check_running
db.collectionEmpty(key)
cursorPosition = 0
cursorPosition = cursorResetPosition
}
def getMessageCount: Int = {
@ -744,11 +746,11 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def recover(listener: MessageRecoveryListener): Unit = {
check_running
cursorPosition = db.cursorMessages(preparedAcks, key, listener, 0)
cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorResetPosition)
}
def resetBatching: Unit = {
cursorPosition = 0
cursorPosition = cursorResetPosition
}
def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
@ -789,6 +791,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
val subscriptions = collection.mutable.HashMap[(String, String), DurableSubscription]()
var firstSeq = 0L
override def cursorResetPosition = firstSeq
def subscription_with_key(key:Long) = subscriptions.find(_._2.subKey == key).map(_._2)
override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {

View File

@ -517,42 +517,43 @@ case class RecordLog(directory: File, logSuffix:String) {
log_infos.map(_._2.position).toArray
}
private def get_reader[T](record_position:Long)(func: (LogReader)=>T) = {
private def get_reader[T](record_position:Long)(func: (LogReader)=>T):Option[T] = {
val lookup = log_mutex.synchronized {
val info = log_info(record_position)
info.map { info=>
if(info.position == current_appender.position) {
current_appender.retain()
(info, current_appender)
} else {
(info, null)
}
val (info, appender) = log_mutex.synchronized {
log_info(record_position) match {
case None =>
warn("No reader available for position: %x, log_infos: %s", record_position, log_infos)
return None
case Some(info) =>
if(info.position == current_appender.position) {
current_appender.retain()
(info, current_appender)
} else {
(info, null)
}
}
}
lookup.map { case (info, appender) =>
val reader = if( appender!=null ) {
// read from the current appender.
appender
} else {
// Checkout a reader from the cache...
reader_cache.synchronized {
var reader = reader_cache.get(info.file)
if(reader==null) {
reader = LogReader(info.file, info.position)
reader_cache.put(info.file, reader)
}
reader.retain()
reader
val reader = if( appender!=null ) {
// read from the current appender.
appender
} else {
// Checkout a reader from the cache...
reader_cache.synchronized {
var reader = reader_cache.get(info.file)
if(reader==null) {
reader = LogReader(info.file, info.position)
reader_cache.put(info.file, reader)
}
reader.retain()
reader
}
}
try {
func(reader)
} finally {
reader.release
}
try {
Some(func(reader))
} finally {
reader.release
}
}