Implementing AMQ-4134: Add XA support to the LevelDB store.

More test cases are now working.  Only the durable sub XA cases are broken at this point.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1401911 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-10-24 23:08:37 +00:00
parent ebd3d34e09
commit 111635ea1c
4 changed files with 113 additions and 47 deletions

View File

@ -21,7 +21,7 @@ import org.fusesource.hawtdispatch._
import org.fusesource.hawtdispatch.BaseRetained
import java.util.concurrent._
import atomic._
import org.fusesource.hawtbuf.Buffer
import org.fusesource.hawtbuf.{DataByteArrayOutputStream, DataByteArrayInputStream, ByteArrayOutputStream, Buffer}
import org.apache.activemq.store.MessageRecoveryListener
import java.lang.ref.WeakReference
import scala.Option._
@ -41,7 +41,7 @@ case class QueueEntryRecord(id:MessageId, queueKey:Long, queueSeq:Long)
case class QueueRecord(id:ActiveMQDestination, queue_key:Long)
case class QueueEntryRange()
case class SubAckRecord(subKey:Long, ackPosition:Long)
case class XaAckRecord(container:Long, seq:Long, ack:Buffer)
case class XaAckRecord(container:Long, seq:Long, ack:MessageAck)
sealed trait UowState {
def stage:Int
@ -130,7 +130,6 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
val uowId:Int = manager.lastUowId.incrementAndGet()
var actions = Map[MessageId, MessageAction]()
var subAcks = ListBuffer[SubAckRecord]()
var xaAcks = ListBuffer[XaAckRecord]()
var completed = false
var disableDelay = false
var delayableActions = 0
@ -147,25 +146,26 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
def syncNeeded = syncFlag || actions.find( _._2.syncNeeded ).isDefined
def size = 100+actions.foldLeft(0L){ case (sum, entry) =>
sum + (entry._2.size+100)
} + (subAcks.size * 100) + xaAcks.foldLeft(0L){ case (sum, entry) =>
sum + entry.ack.length
}
} + (subAcks.size * 100)
class MessageAction {
var id:MessageId = _
var messageRecord: MessageRecord = null
var enqueues = ListBuffer[QueueEntryRecord]()
var dequeues = ListBuffer[QueueEntryRecord]()
var xaAcks = ListBuffer[XaAckRecord]()
def uow = DelayableUOW.this
def isEmpty() = messageRecord==null && enqueues==Nil && dequeues==Nil
def isEmpty() = messageRecord==null && enqueues.isEmpty && dequeues.isEmpty && xaAcks.isEmpty
def cancel() = {
uow.rm(id)
}
def syncNeeded = messageRecord!=null && messageRecord.syncNeeded
def size = (if(messageRecord!=null) messageRecord.data.length+20 else 0) + ((enqueues.size+dequeues.size)*50)
def size = (if(messageRecord!=null) messageRecord.data.length+20 else 0) + ((enqueues.size+dequeues.size)*50) + xaAcks.foldLeft(0L){ case (sum, entry) =>
sum + 100
}
def addToPendingStore() = {
var set = manager.pendingStores.get(id)
@ -222,8 +222,10 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
}
def xaAck(container:Long, seq:Long, ack:MessageAck) = {
var packet = manager.parent.wireFormat.marshal(ack)
xaAcks += XaAckRecord(container, seq, new Buffer(packet.data, packet.offset, packet.length))
this.synchronized {
getAction(ack.getLastMessageId).xaAcks+=(XaAckRecord(container, seq, ack))
}
countDownFuture
}
def enqueue(queueKey:Long, queueSeq:Long, message:Message, delay_enqueue:Boolean) = {
@ -641,12 +643,10 @@ class DBManager(val parent:LevelDBStore) {
def getXAActions(key:Long) = {
val msgs = ListBuffer[Message]()
val acks = ListBuffer[MessageAck]()
println("transactionCursor")
client.transactionCursor(key) { command =>
println("recovered command: "+command)
command match {
case message:Message => msgs += message
case ack:MessageAck => acks += ack
case record:XaAckRecord => acks += record.ack
}
true
}

View File

@ -944,13 +944,21 @@ class LevelDBClient(store: LevelDBStore) {
}
}
def transactionCursor(collectionKey: Long)(func: (DataStructure)=>Boolean) = {
def transactionCursor(collectionKey: Long)(func: (AnyRef)=>Boolean) = {
collectionCursor(collectionKey, encodeLong(0)) { (key, value) =>
val seq = decodeLong(key)
if( value.getMeta != null ) {
val data = value.getMeta
val ack = store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[MessageAck].asInstanceOf[MessageAck]
func(ack)
val is = new DataByteArrayInputStream(value.getMeta);
val log = is.readLong()
val offset = is.readInt()
val qid = is.readLong()
val seq = is.readLong()
val ack = store.wireFormat.unmarshal(is).asInstanceOf[MessageAck]
ack.getLastMessageId.setDataLocator((log, offset))
ack.getLastMessageId.setEntryLocator((qid, seq))
func(XaAckRecord(collectionKey, seq, ack))
} else {
var locator = (value.getValueLocation, value.getValueLength)
val msg = getMessage(locator)
@ -1068,7 +1076,7 @@ class LevelDBClient(store: LevelDBStore) {
dataLocator = entry.id.getDataLocator match {
case x:(Long, Int) => x
case x:MessageRecord => x.locator
case _ => throw new RuntimeException("Unexpected locator type")
case _ => throw new RuntimeException("Unexpected locator type: "+dataLocator)
}
}
@ -1126,18 +1134,37 @@ class LevelDBClient(store: LevelDBStore) {
write_enqueue_total += System.nanoTime() - start
}
}
action.xaAcks.foreach { entry =>
val ack = entry.ack
if( dataLocator==null ) {
dataLocator = ack.getLastMessageId.getDataLocator match {
case x:(Long, Int) => x
case x:MessageRecord => x.locator
case _ =>
throw new RuntimeException("Unexpected locator type")
}
}
val (qid, seq) = ack.getLastMessageId.getEntryLocator.asInstanceOf[(Long, Long)];
val os = new DataByteArrayOutputStream()
os.writeLong(dataLocator._1)
os.writeInt(dataLocator._2)
os.writeLong(qid)
os.writeLong(seq)
store.wireFormat.marshal(ack, os)
var ack_encoded = os.toBuffer
val key = encodeEntryKey(ENTRY_PREFIX, entry.container, entry.seq)
val log_record = new EntryRecord.Bean()
log_record.setCollectionKey(entry.container)
log_record.setEntryKey(new Buffer(key, 9, 8))
log_record.setMeta(ack_encoded)
appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
val index_record = new EntryRecord.Bean()
index_record.setMeta(ack_encoded)
batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
}
uow.xaAcks.foreach { entry =>
val key = encodeEntryKey(ENTRY_PREFIX, entry.container, entry.seq)
val log_record = new EntryRecord.Bean()
log_record.setCollectionKey(entry.container)
log_record.setEntryKey(new Buffer(key, 9, 8))
log_record.setMeta(entry.ack)
appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
val index_record = new EntryRecord.Bean()
index_record.setValue(entry.ack)
batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
}
uow.subAcks.foreach { entry =>

View File

@ -229,7 +229,9 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
for ( ack <- acks ) {
// think we might have store design issue /w XA transactions and durable sub acks.
// does it even work for the other stores?
transaction.remove(createMessageStore(ack.getDestination), ack);
var store = createMessageStore(ack.getDestination)
store.preparedAcks.add(ack.getLastMessageId)
transaction.remove(store, ack);
}
}
debug("started")
@ -322,16 +324,25 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
def remove(store:LevelDBStore#LevelDBMessageStore, ack:MessageAck) = {
commitActions += new TransactionAction() {
def commit(uow:DelayableUOW) = {
store.doRemove(uow, ack.getLastMessageId)
if( prepared ) {
store.preparedAcks.remove(ack.getLastMessageId)
}
}
def prepare(uow:DelayableUOW) = {
// add it to the xa container instead of the actual store container.
uow.xaAck(xacontainer_id, xaseqcounter.incrementAndGet, ack)
xarecovery._2 += ack
store.preparedAcks.add(ack.getLastMessageId)
}
def rollback(uow: DelayableUOW) {
if( prepared ) {
store.preparedAcks.remove(ack.getLastMessageId)
}
}
}
}
@ -380,6 +391,15 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
println("The transaction does not exist")
case Some(tx)=>
if( tx.prepared ) {
val done = new CountDownLatch(1)
withUow { uow =>
for( action <- tx.commitActions ) {
action.rollback(uow)
}
uow.syncFlag = true
uow.addCompleteListener { done.countDown() }
}
done.await()
db.removeTransactionContainer(tx.xacontainer_id)
}
}
@ -394,12 +414,18 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
}
}
var doingRecover = false
def recover(listener: TransactionRecoveryListener) = {
for ( (txid, transaction) <- transactions ) {
if( transaction.prepared ) {
val (msgs, acks) = transaction.xarecovery
listener.recover(txid.asInstanceOf[XATransactionId], msgs.toArray, acks.toArray);
this.doingRecover = true
try {
for ( (txid, transaction) <- transactions ) {
if( transaction.prepared ) {
val (msgs, acks) = transaction.xarecovery
listener.recover(txid.asInstanceOf[XATransactionId], msgs.toArray, acks.toArray);
}
}
} finally {
this.doingRecover = false
}
}
@ -490,6 +516,7 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
protected val lastSeq: AtomicLong = new AtomicLong(0)
protected var cursorPosition: Long = 0
val preparedAcks = new HashSet[MessageId]()
lastSeq.set(db.getLastQueueEntrySeq(key))
@ -556,7 +583,25 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
}
def recover(listener: MessageRecoveryListener): Unit = {
cursorPosition = db.cursorMessages(key, listener, 0)
cursorPosition = db.cursorMessages(key, preparedExcluding(listener), 0)
}
def preparedExcluding(listener: MessageRecoveryListener) = new MessageRecoveryListener {
def isDuplicate(ref: MessageId) = listener.isDuplicate(ref)
def hasSpace = listener.hasSpace
def recoverMessageReference(ref: MessageId) = {
if (!preparedAcks.contains(ref)) {
listener.recoverMessageReference(ref)
}
true
}
def recoverMessage(message: Message) = {
if (!preparedAcks.contains(message.getMessageId)) {
listener.recoverMessage(message)
}
true
}
}
def resetBatching: Unit = {
@ -564,7 +609,7 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
}
def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
cursorPosition = db.cursorMessages(key, LimitingRecoveryListener(maxReturned, listener), cursorPosition)
cursorPosition = db.cursorMessages(key, preparedExcluding(LimitingRecoveryListener(maxReturned, listener)), cursorPosition)
}
override def setBatch(id: MessageId): Unit = {
@ -697,7 +742,7 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = {
lookup(clientId, subscriptionName).foreach { sub =>
sub.cursorPosition = db.cursorMessages(key, LimitingRecoveryListener(maxReturned, listener), sub.cursorPosition.max(sub.lastAckPosition+1))
sub.cursorPosition = db.cursorMessages(key, preparedExcluding(LimitingRecoveryListener(maxReturned, listener)), sub.cursorPosition.max(sub.lastAckPosition+1))
}
}

View File

@ -27,21 +27,15 @@ public class LevelDBXARecoveryBrokerTest extends XARecoveryBrokerTest {
broker.setPersistenceAdapter(store);
}
// TODO: The following test cases are failing...
@Override
public void testQueuePersistentPreparedAcksNotLostOnRestart() throws Exception {
}
@Override
public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
}
@Override
public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
// XA Durable Subs not yet implemented
// super.testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback();
}
@Override
public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception {
// XA Durable Subs not yet implemented
// super.testTopicPersistentPreparedAcksAvailableAfterRollback();
}
}