From 111635ea1c743b613cbb8cfda66dd67f405679bf Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Wed, 24 Oct 2012 23:08:37 +0000 Subject: [PATCH] Implementing AMQ-4134: Add XA support to the LevelDB store. More test cases are now working. Only the durable sub XA cases are broken at this point. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1401911 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/leveldb/DBManager.scala | 26 ++++---- .../activemq/leveldb/LevelDBClient.scala | 59 +++++++++++++----- .../activemq/leveldb/LevelDBStore.scala | 61 ++++++++++++++++--- .../leveldb/LevelDBXARecoveryBrokerTest.java | 14 ++--- 4 files changed, 113 insertions(+), 47 deletions(-) diff --git a/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index 8158e2b58d..7fcd59b57f 100644 --- a/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -21,7 +21,7 @@ import org.fusesource.hawtdispatch._ import org.fusesource.hawtdispatch.BaseRetained import java.util.concurrent._ import atomic._ -import org.fusesource.hawtbuf.Buffer +import org.fusesource.hawtbuf.{DataByteArrayOutputStream, DataByteArrayInputStream, ByteArrayOutputStream, Buffer} import org.apache.activemq.store.MessageRecoveryListener import java.lang.ref.WeakReference import scala.Option._ @@ -41,7 +41,7 @@ case class QueueEntryRecord(id:MessageId, queueKey:Long, queueSeq:Long) case class QueueRecord(id:ActiveMQDestination, queue_key:Long) case class QueueEntryRange() case class SubAckRecord(subKey:Long, ackPosition:Long) -case class XaAckRecord(container:Long, seq:Long, ack:Buffer) +case class XaAckRecord(container:Long, seq:Long, ack:MessageAck) sealed trait UowState { def stage:Int @@ -130,7 +130,6 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained { val uowId:Int = manager.lastUowId.incrementAndGet() var actions = Map[MessageId, MessageAction]() var subAcks = ListBuffer[SubAckRecord]() - var xaAcks = ListBuffer[XaAckRecord]() var completed = false var disableDelay = false var delayableActions = 0 @@ -147,25 +146,26 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained { def syncNeeded = syncFlag || actions.find( _._2.syncNeeded ).isDefined def size = 100+actions.foldLeft(0L){ case (sum, entry) => sum + (entry._2.size+100) - } + (subAcks.size * 100) + xaAcks.foldLeft(0L){ case (sum, entry) => - sum + entry.ack.length - } + } + (subAcks.size * 100) class MessageAction { var id:MessageId = _ var messageRecord: MessageRecord = null var enqueues = ListBuffer[QueueEntryRecord]() var dequeues = ListBuffer[QueueEntryRecord]() + var xaAcks = ListBuffer[XaAckRecord]() def uow = DelayableUOW.this - def isEmpty() = messageRecord==null && enqueues==Nil && dequeues==Nil + def isEmpty() = messageRecord==null && enqueues.isEmpty && dequeues.isEmpty && xaAcks.isEmpty def cancel() = { uow.rm(id) } def syncNeeded = messageRecord!=null && messageRecord.syncNeeded - def size = (if(messageRecord!=null) messageRecord.data.length+20 else 0) + ((enqueues.size+dequeues.size)*50) + def size = (if(messageRecord!=null) messageRecord.data.length+20 else 0) + ((enqueues.size+dequeues.size)*50) + xaAcks.foldLeft(0L){ case (sum, entry) => + sum + 100 + } def addToPendingStore() = { var set = manager.pendingStores.get(id) @@ -222,8 +222,10 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained { } def xaAck(container:Long, seq:Long, ack:MessageAck) = { - var packet = manager.parent.wireFormat.marshal(ack) - xaAcks += XaAckRecord(container, seq, new Buffer(packet.data, packet.offset, packet.length)) + this.synchronized { + getAction(ack.getLastMessageId).xaAcks+=(XaAckRecord(container, seq, ack)) + } + countDownFuture } def enqueue(queueKey:Long, queueSeq:Long, message:Message, delay_enqueue:Boolean) = { @@ -641,12 +643,10 @@ class DBManager(val parent:LevelDBStore) { def getXAActions(key:Long) = { val msgs = ListBuffer[Message]() val acks = ListBuffer[MessageAck]() - println("transactionCursor") client.transactionCursor(key) { command => - println("recovered command: "+command) command match { case message:Message => msgs += message - case ack:MessageAck => acks += ack + case record:XaAckRecord => acks += record.ack } true } diff --git a/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index 9861c8cd52..eb06fc7a5d 100755 --- a/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -944,13 +944,21 @@ class LevelDBClient(store: LevelDBStore) { } } - def transactionCursor(collectionKey: Long)(func: (DataStructure)=>Boolean) = { + def transactionCursor(collectionKey: Long)(func: (AnyRef)=>Boolean) = { collectionCursor(collectionKey, encodeLong(0)) { (key, value) => val seq = decodeLong(key) if( value.getMeta != null ) { - val data = value.getMeta - val ack = store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[MessageAck].asInstanceOf[MessageAck] - func(ack) + + val is = new DataByteArrayInputStream(value.getMeta); + val log = is.readLong() + val offset = is.readInt() + val qid = is.readLong() + val seq = is.readLong() + val ack = store.wireFormat.unmarshal(is).asInstanceOf[MessageAck] + ack.getLastMessageId.setDataLocator((log, offset)) + ack.getLastMessageId.setEntryLocator((qid, seq)) + + func(XaAckRecord(collectionKey, seq, ack)) } else { var locator = (value.getValueLocation, value.getValueLength) val msg = getMessage(locator) @@ -1068,7 +1076,7 @@ class LevelDBClient(store: LevelDBStore) { dataLocator = entry.id.getDataLocator match { case x:(Long, Int) => x case x:MessageRecord => x.locator - case _ => throw new RuntimeException("Unexpected locator type") + case _ => throw new RuntimeException("Unexpected locator type: "+dataLocator) } } @@ -1126,18 +1134,37 @@ class LevelDBClient(store: LevelDBStore) { write_enqueue_total += System.nanoTime() - start } - } + action.xaAcks.foreach { entry => + val ack = entry.ack + if( dataLocator==null ) { + dataLocator = ack.getLastMessageId.getDataLocator match { + case x:(Long, Int) => x + case x:MessageRecord => x.locator + case _ => + throw new RuntimeException("Unexpected locator type") + } + } + + val (qid, seq) = ack.getLastMessageId.getEntryLocator.asInstanceOf[(Long, Long)]; + val os = new DataByteArrayOutputStream() + os.writeLong(dataLocator._1) + os.writeInt(dataLocator._2) + os.writeLong(qid) + os.writeLong(seq) + store.wireFormat.marshal(ack, os) + var ack_encoded = os.toBuffer + + val key = encodeEntryKey(ENTRY_PREFIX, entry.container, entry.seq) + val log_record = new EntryRecord.Bean() + log_record.setCollectionKey(entry.container) + log_record.setEntryKey(new Buffer(key, 9, 8)) + log_record.setMeta(ack_encoded) + appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze())) + val index_record = new EntryRecord.Bean() + index_record.setMeta(ack_encoded) + batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray) + } - uow.xaAcks.foreach { entry => - val key = encodeEntryKey(ENTRY_PREFIX, entry.container, entry.seq) - val log_record = new EntryRecord.Bean() - log_record.setCollectionKey(entry.container) - log_record.setEntryKey(new Buffer(key, 9, 8)) - log_record.setMeta(entry.ack) - appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze())) - val index_record = new EntryRecord.Bean() - index_record.setValue(entry.ack) - batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray) } uow.subAcks.foreach { entry => diff --git a/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 55b3cbe47b..ef156fec04 100644 --- a/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -229,7 +229,9 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten for ( ack <- acks ) { // think we might have store design issue /w XA transactions and durable sub acks. // does it even work for the other stores? - transaction.remove(createMessageStore(ack.getDestination), ack); + var store = createMessageStore(ack.getDestination) + store.preparedAcks.add(ack.getLastMessageId) + transaction.remove(store, ack); } } debug("started") @@ -322,16 +324,25 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten def remove(store:LevelDBStore#LevelDBMessageStore, ack:MessageAck) = { commitActions += new TransactionAction() { + def commit(uow:DelayableUOW) = { store.doRemove(uow, ack.getLastMessageId) + if( prepared ) { + store.preparedAcks.remove(ack.getLastMessageId) + } } + def prepare(uow:DelayableUOW) = { // add it to the xa container instead of the actual store container. uow.xaAck(xacontainer_id, xaseqcounter.incrementAndGet, ack) xarecovery._2 += ack + store.preparedAcks.add(ack.getLastMessageId) } def rollback(uow: DelayableUOW) { + if( prepared ) { + store.preparedAcks.remove(ack.getLastMessageId) + } } } } @@ -380,6 +391,15 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten println("The transaction does not exist") case Some(tx)=> if( tx.prepared ) { + val done = new CountDownLatch(1) + withUow { uow => + for( action <- tx.commitActions ) { + action.rollback(uow) + } + uow.syncFlag = true + uow.addCompleteListener { done.countDown() } + } + done.await() db.removeTransactionContainer(tx.xacontainer_id) } } @@ -394,12 +414,18 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten } } + var doingRecover = false def recover(listener: TransactionRecoveryListener) = { - for ( (txid, transaction) <- transactions ) { - if( transaction.prepared ) { - val (msgs, acks) = transaction.xarecovery - listener.recover(txid.asInstanceOf[XATransactionId], msgs.toArray, acks.toArray); + this.doingRecover = true + try { + for ( (txid, transaction) <- transactions ) { + if( transaction.prepared ) { + val (msgs, acks) = transaction.xarecovery + listener.recover(txid.asInstanceOf[XATransactionId], msgs.toArray, acks.toArray); + } } + } finally { + this.doingRecover = false } } @@ -490,6 +516,7 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten protected val lastSeq: AtomicLong = new AtomicLong(0) protected var cursorPosition: Long = 0 + val preparedAcks = new HashSet[MessageId]() lastSeq.set(db.getLastQueueEntrySeq(key)) @@ -556,7 +583,25 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten } def recover(listener: MessageRecoveryListener): Unit = { - cursorPosition = db.cursorMessages(key, listener, 0) + cursorPosition = db.cursorMessages(key, preparedExcluding(listener), 0) + } + + def preparedExcluding(listener: MessageRecoveryListener) = new MessageRecoveryListener { + def isDuplicate(ref: MessageId) = listener.isDuplicate(ref) + def hasSpace = listener.hasSpace + def recoverMessageReference(ref: MessageId) = { + if (!preparedAcks.contains(ref)) { + listener.recoverMessageReference(ref) + } + true + } + + def recoverMessage(message: Message) = { + if (!preparedAcks.contains(message.getMessageId)) { + listener.recoverMessage(message) + } + true + } } def resetBatching: Unit = { @@ -564,7 +609,7 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten } def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = { - cursorPosition = db.cursorMessages(key, LimitingRecoveryListener(maxReturned, listener), cursorPosition) + cursorPosition = db.cursorMessages(key, preparedExcluding(LimitingRecoveryListener(maxReturned, listener)), cursorPosition) } override def setBatch(id: MessageId): Unit = { @@ -697,7 +742,7 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = { lookup(clientId, subscriptionName).foreach { sub => - sub.cursorPosition = db.cursorMessages(key, LimitingRecoveryListener(maxReturned, listener), sub.cursorPosition.max(sub.lastAckPosition+1)) + sub.cursorPosition = db.cursorMessages(key, preparedExcluding(LimitingRecoveryListener(maxReturned, listener)), sub.cursorPosition.max(sub.lastAckPosition+1)) } } diff --git a/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java b/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java index fa6ddf22dd..eaa7119f69 100644 --- a/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java +++ b/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java @@ -27,21 +27,15 @@ public class LevelDBXARecoveryBrokerTest extends XARecoveryBrokerTest { broker.setPersistenceAdapter(store); } - // TODO: The following test cases are failing... - - @Override - public void testQueuePersistentPreparedAcksNotLostOnRestart() throws Exception { - } - - @Override - public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception { - } @Override public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception { +// XA Durable Subs not yet implemented +// super.testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback(); } - @Override public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception { +// XA Durable Subs not yet implemented +// super.testTopicPersistentPreparedAcksAvailableAfterRollback(); } }