diff --git a/activemq-leveldb/pom.xml b/activemq-leveldb/pom.xml
index 1487c8c899..7f9b6465d5 100644
--- a/activemq-leveldb/pom.xml
+++ b/activemq-leveldb/pom.xml
@@ -182,6 +182,16 @@
+
+ org.slf4j
+ slf4j-log4j12
+ test
+
+
+ log4j
+ log4j
+ test
+
org.apache.activemq
activemq-core
diff --git a/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index c71debeace..8158e2b58d 100644
--- a/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ b/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -31,6 +31,7 @@ import org.apache.activemq.leveldb.record.{SubscriptionRecord, CollectionRecord}
import util.TimeMetric
import java.util.HashMap
import collection.mutable.{HashSet, ListBuffer}
+import org.apache.activemq.util.ByteSequence
case class MessageRecord(id:MessageId, data:Buffer, syncNeeded:Boolean) {
var locator:(Long, Int) = _
@@ -40,6 +41,7 @@ case class QueueEntryRecord(id:MessageId, queueKey:Long, queueSeq:Long)
case class QueueRecord(id:ActiveMQDestination, queue_key:Long)
case class QueueEntryRange()
case class SubAckRecord(subKey:Long, ackPosition:Long)
+case class XaAckRecord(container:Long, seq:Long, ack:Buffer)
sealed trait UowState {
def stage:Int
@@ -128,6 +130,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
val uowId:Int = manager.lastUowId.incrementAndGet()
var actions = Map[MessageId, MessageAction]()
var subAcks = ListBuffer[SubAckRecord]()
+ var xaAcks = ListBuffer[XaAckRecord]()
var completed = false
var disableDelay = false
var delayableActions = 0
@@ -140,10 +143,13 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
this._state = next
}
- def syncNeeded = actions.find( _._2.syncNeeded ).isDefined
+ var syncFlag = false
+ def syncNeeded = syncFlag || actions.find( _._2.syncNeeded ).isDefined
def size = 100+actions.foldLeft(0L){ case (sum, entry) =>
sum + (entry._2.size+100)
- } + (subAcks.size * 100)
+ } + (subAcks.size * 100) + xaAcks.foldLeft(0L){ case (sum, entry) =>
+ sum + entry.ack.length
+ }
class MessageAction {
var id:MessageId = _
@@ -215,6 +221,11 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
subAcks += SubAckRecord(sub.subKey, sub.lastAckPosition)
}
+ def xaAck(container:Long, seq:Long, ack:MessageAck) = {
+ var packet = manager.parent.wireFormat.marshal(ack)
+ xaAcks += XaAckRecord(container, seq, new Buffer(packet.data, packet.offset, packet.length))
+ }
+
def enqueue(queueKey:Long, queueSeq:Long, message:Message, delay_enqueue:Boolean) = {
var delay = delay_enqueue && message.getTransactionId==null
if(delay ) {
@@ -264,8 +275,9 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
countDownFuture
}
- def dequeue(queueKey:Long, id:MessageId) = {
+ def dequeue(expectedQueueKey:Long, id:MessageId) = {
val (queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[(Long, Long)];
+ assert(queueKey == expectedQueueKey)
val entry = QueueEntryRecord(id, queueKey, queueSeq)
this.synchronized {
getAction(id).dequeues += entry
@@ -626,11 +638,26 @@ class DBManager(val parent:LevelDBStore) {
nextPos
}
+ def getXAActions(key:Long) = {
+ val msgs = ListBuffer[Message]()
+ val acks = ListBuffer[MessageAck]()
+ println("transactionCursor")
+ client.transactionCursor(key) { command =>
+ println("recovered command: "+command)
+ command match {
+ case message:Message => msgs += message
+ case ack:MessageAck => acks += ack
+ }
+ true
+ }
+ (msgs, acks)
+ }
+
def queuePosition(id: MessageId):Long = {
id.getEntryLocator.asInstanceOf[(Long, Long)]._2
}
- def createQueueStore(dest:ActiveMQQueue):parent.LevelDBMessageStore = {
+ def createQueueStore(dest:ActiveMQQueue):LevelDBStore#LevelDBMessageStore = {
parent.createQueueMessageStore(dest, createStore(dest, QUEUE_COLLECTION_TYPE))
}
def destroyQueueStore(key:Long) = writeExecutor.sync {
@@ -665,7 +692,7 @@ class DBManager(val parent:LevelDBStore) {
DurableSubscription(collection.getKey, topic_key, info)
}
- def removeSubscription(sub:DurableSubscription) = {
+ def removeSubscription(sub:DurableSubscription) {
client.removeCollection(sub.subKey)
}
@@ -686,7 +713,26 @@ class DBManager(val parent:LevelDBStore) {
}
collection.getKey
}
-
+
+ def createTransactionContainer(name:XATransactionId) = {
+ val collection = new CollectionRecord.Bean()
+ collection.setType(TRANSACTION_COLLECTION_TYPE)
+ var packet = parent.wireFormat.marshal(name)
+ collection.setMeta(new Buffer(packet.data, packet.offset, packet.length))
+ collection.setKey(lastCollectionKey.incrementAndGet())
+ val buffer = collection.freeze()
+ buffer.toFramedBuffer // eager encode the record.
+ writeExecutor.sync {
+ client.addCollection(buffer)
+ }
+ collection.getKey
+ }
+
+ def removeTransactionContainer(key:Long) = { // writeExecutor.sync {
+ client.removeCollection(key)
+ }
+
+
def loadCollections = {
val collections = writeExecutor.sync {
client.listCollections
@@ -716,6 +762,11 @@ class DBManager(val parent:LevelDBStore) {
var sub = DurableSubscription(key, sr.getTopicKey, info)
sub.lastAckPosition = client.getAckPosition(key);
parent.createSubscription(sub)
+ case TRANSACTION_COLLECTION_TYPE =>
+ val meta = record.getMeta
+ val txid = parent.wireFormat.unmarshal(new ByteSequence(meta.data, meta.offset, meta.length)).asInstanceOf[XATransactionId]
+ val transaction = parent.transaction(txid)
+ transaction.xacontainer_id = key
case _ =>
}
}
diff --git a/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index 7ae1dc7e71..9861c8cd52 100755
--- a/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ b/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -32,7 +32,7 @@ import java.util.concurrent._
import org.fusesource.hawtbuf._
import java.io.{ObjectInputStream, ObjectOutputStream, File}
import scala.Option._
-import org.apache.activemq.command.Message
+import org.apache.activemq.command.{MessageAck, DataStructure, Message}
import org.apache.activemq.util.ByteSequence
import org.apache.activemq.leveldb.RecordLog.LogInfo
import java.text.SimpleDateFormat
@@ -944,6 +944,23 @@ class LevelDBClient(store: LevelDBStore) {
}
}
+ def transactionCursor(collectionKey: Long)(func: (DataStructure)=>Boolean) = {
+ collectionCursor(collectionKey, encodeLong(0)) { (key, value) =>
+ val seq = decodeLong(key)
+ if( value.getMeta != null ) {
+ val data = value.getMeta
+ val ack = store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[MessageAck].asInstanceOf[MessageAck]
+ func(ack)
+ } else {
+ var locator = (value.getValueLocation, value.getValueLength)
+ val msg = getMessage(locator)
+ msg.getMessageId().setEntryLocator((collectionKey, seq))
+ msg.getMessageId().setDataLocator(locator)
+ func(msg)
+ }
+ }
+ }
+
def getAckPosition(subKey: Long): Long = {
retryUsingIndex {
index.get(encodeEntryKey(ENTRY_PREFIX, subKey, ACK_POSITION)).map{ value=>
@@ -1110,6 +1127,19 @@ class LevelDBClient(store: LevelDBStore) {
}
}
+
+ uow.xaAcks.foreach { entry =>
+ val key = encodeEntryKey(ENTRY_PREFIX, entry.container, entry.seq)
+ val log_record = new EntryRecord.Bean()
+ log_record.setCollectionKey(entry.container)
+ log_record.setEntryKey(new Buffer(key, 9, 8))
+ log_record.setMeta(entry.ack)
+ appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
+ val index_record = new EntryRecord.Bean()
+ index_record.setValue(entry.ack)
+ batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
+ }
+
uow.subAcks.foreach { entry =>
val key = encodeEntryKey(ENTRY_PREFIX, entry.subKey, ACK_POSITION)
val log_record = new EntryRecord.Bean()
diff --git a/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 50473f4454..55b3cbe47b 100644
--- a/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -25,13 +25,13 @@ import org.apache.activemq.openwire.OpenWireFormat
import org.apache.activemq.usage.SystemUsage
import java.io.File
import java.io.IOException
-import java.util.concurrent.ExecutionException
-import java.util.concurrent.Future
+import java.util.concurrent.{CountDownLatch, ExecutionException, Future}
import java.util.concurrent.atomic.AtomicLong
import reflect.BeanProperty
import org.apache.activemq.store._
import java.util._
-import scala.collection.mutable.ListBuffer
+import collection.mutable.ListBuffer
+import concurrent.CountDownLatch
import javax.management.ObjectName
import org.apache.activemq.broker.jmx.AnnotatedMBean
import org.apache.activemq.util._
@@ -217,6 +217,21 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
db.start
db.loadCollections
+
+ // Finish recovering the prepared XA transactions.
+ for( (txid, transaction) <- transactions ) {
+ assert( transaction.xacontainer_id != -1 )
+ val (msgs, acks) = db.getXAActions(transaction.xacontainer_id)
+ transaction.xarecovery = (msgs, acks)
+ for ( msg <- msgs ) {
+ transaction.add(createMessageStore(msg.getDestination), msg, false);
+ }
+ for ( ack <- acks ) {
+ // think we might have store design issue /w XA transactions and durable sub acks.
+ // does it even work for the other stores?
+ transaction.remove(createMessageStore(ack.getDestination), ack);
+ }
+ }
debug("started")
}
@@ -252,30 +267,84 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
val transactions = collection.mutable.HashMap[TransactionId, Transaction]()
trait TransactionAction {
- def apply(uow:DelayableUOW):Unit
+ def commit(uow:DelayableUOW):Unit
+ def prepare(uow:DelayableUOW):Unit
+ def rollback(uow:DelayableUOW):Unit
}
case class Transaction(id:TransactionId) {
- val commitActions = ListBuffer[TransactionAction]()
- def add(store:LevelDBMessageStore, message: Message, delay:Boolean) = {
+ val commitActions = ListBuffer[TransactionAction]()
+
+ val xaseqcounter: AtomicLong = new AtomicLong(0)
+ var xarecovery:(ListBuffer[Message], ListBuffer[MessageAck]) = null
+ var xacontainer_id = -1L
+
+ def prepared = xarecovery!=null
+ def prepare = {
+ if( !prepared ) {
+ val done = new CountDownLatch(1)
+ withUow { uow =>
+ xarecovery = (ListBuffer[Message](), ListBuffer[MessageAck]())
+ xacontainer_id = db.createTransactionContainer(id.asInstanceOf[XATransactionId])
+ for ( action <- commitActions ) {
+ action.prepare(uow)
+ }
+ uow.syncFlag = true
+ uow.addCompleteListener(done.countDown())
+ }
+ done.await()
+ }
+ }
+
+ def add(store:LevelDBStore#LevelDBMessageStore, message: Message, delay:Boolean) = {
commitActions += new TransactionAction() {
- def apply(uow:DelayableUOW) = {
+ def commit(uow:DelayableUOW) = {
+ if( prepared ) {
+ uow.dequeue(xacontainer_id, message.getMessageId)
+ }
store.doAdd(uow, message, delay)
}
+
+ def prepare(uow:DelayableUOW) = {
+ // add it to the xa container instead of the actual store container.
+ uow.enqueue(xacontainer_id, xaseqcounter.incrementAndGet, message, delay)
+ xarecovery._1 += message
+ }
+
+ def rollback(uow:DelayableUOW) = {
+ if( prepared ) {
+ uow.dequeue(xacontainer_id, message.getMessageId)
+ }
+ }
+
}
}
- def remove(store:LevelDBMessageStore, msgid:MessageId) = {
+
+ def remove(store:LevelDBStore#LevelDBMessageStore, ack:MessageAck) = {
commitActions += new TransactionAction() {
- def apply(uow:DelayableUOW) = {
- store.doRemove(uow, msgid)
+ def commit(uow:DelayableUOW) = {
+ store.doRemove(uow, ack.getLastMessageId)
+ }
+ def prepare(uow:DelayableUOW) = {
+ // add it to the xa container instead of the actual store container.
+ uow.xaAck(xacontainer_id, xaseqcounter.incrementAndGet, ack)
+ xarecovery._2 += ack
+ }
+
+ def rollback(uow: DelayableUOW) {
}
}
}
- def updateAckPosition(store:LevelDBTopicMessageStore, sub: DurableSubscription, position: Long) = {
+
+ def updateAckPosition(store:LevelDBStore#LevelDBTopicMessageStore, sub: DurableSubscription, position: Long) = {
commitActions += new TransactionAction() {
- def apply(uow:DelayableUOW) = {
+ def commit(uow:DelayableUOW) = {
store.doUpdateAckPosition(uow, sub, position)
}
+ def prepare(uow:DelayableUOW) = {
+ }
+ def rollback(uow: DelayableUOW) {
+ }
}
}
}
@@ -289,12 +358,19 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
println("The transaction does not exist")
postCommit.run()
case Some(tx)=>
+ val done = new CountDownLatch(1)
withUow { uow =>
for( action <- tx.commitActions ) {
- action(uow)
+ action.commit(uow)
}
- uow.addCompleteListener( postCommit.run() )
+ uow.syncFlag = true
+ uow.addCompleteListener { done.countDown() }
}
+ done.await()
+ if( tx.prepared ) {
+ db.removeTransactionContainer(tx.xacontainer_id)
+ }
+ postCommit.run()
}
}
@@ -303,20 +379,44 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
case None=>
println("The transaction does not exist")
case Some(tx)=>
+ if( tx.prepared ) {
+ db.removeTransactionContainer(tx.xacontainer_id)
+ }
}
}
def prepare(tx: TransactionId) = {
- sys.error("XA transactions not yet supported.")
- }
- def recover(listener: TransactionRecoveryListener) = {
+ transactions.get(tx) match {
+ case None=>
+ println("The transaction does not exist")
+ case Some(tx)=>
+ tx.prepare
+ }
}
- def createQueueMessageStore(destination: ActiveMQQueue) = {
+ def recover(listener: TransactionRecoveryListener) = {
+ for ( (txid, transaction) <- transactions ) {
+ if( transaction.prepared ) {
+ val (msgs, acks) = transaction.xarecovery
+ listener.recover(txid.asInstanceOf[XATransactionId], msgs.toArray, acks.toArray);
+ }
+ }
+ }
+
+ def createMessageStore(destination: ActiveMQDestination):LevelDBStore#LevelDBMessageStore = {
+ destination match {
+ case destination:ActiveMQQueue =>
+ createQueueMessageStore(destination)
+ case destination:ActiveMQTopic =>
+ createTopicMessageStore(destination)
+ }
+ }
+
+ def createQueueMessageStore(destination: ActiveMQQueue):LevelDBStore#LevelDBMessageStore = {
this.synchronized(queues.get(destination)).getOrElse(db.createQueueStore(destination))
}
- def createQueueMessageStore(destination: ActiveMQQueue, key: Long):LevelDBMessageStore = {
+ def createQueueMessageStore(destination: ActiveMQQueue, key: Long):LevelDBStore#LevelDBMessageStore = {
var rc = new LevelDBMessageStore(destination, key)
this.synchronized {
queues.put(destination, rc)
@@ -330,11 +430,11 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
}
}
- def createTopicMessageStore(destination: ActiveMQTopic): TopicMessageStore = {
+ def createTopicMessageStore(destination: ActiveMQTopic):LevelDBStore#LevelDBTopicMessageStore = {
this.synchronized(topics.get(destination)).getOrElse(db.createTopicStore(destination))
}
- def createTopicMessageStore(destination: ActiveMQTopic, key: Long):LevelDBTopicMessageStore = {
+ def createTopicMessageStore(destination: ActiveMQTopic, key: Long):LevelDBStore#LevelDBTopicMessageStore = {
var rc = new LevelDBTopicMessageStore(destination, key)
this synchronized {
topics.put(destination, rc)
@@ -421,7 +521,7 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
override def removeAsyncMessage(context: ConnectionContext, ack: MessageAck): Unit = {
if( ack.getTransactionId!=null ) {
- transaction(ack.getTransactionId).remove(this, ack.getLastMessageId)
+ transaction(ack.getTransactionId).remove(this, ack)
DONE
} else {
waitOn(withUow{uow=>
diff --git a/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java b/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java
new file mode 100644
index 0000000000..fa6ddf22dd
--- /dev/null
+++ b/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java
@@ -0,0 +1,47 @@
+package org.apache.activemq.leveldb;
+
+import junit.framework.Test;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.XARecoveryBrokerTest;
+
+import java.io.File;
+
+/**
+ * @author Hiram Chirino
+ */
+public class LevelDBXARecoveryBrokerTest extends XARecoveryBrokerTest {
+
+ public static Test suite() {
+ return suite(LevelDBXARecoveryBrokerTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ @Override
+ protected void configureBroker(BrokerService broker) throws Exception {
+ super.configureBroker(broker);
+ LevelDBStore store = new LevelDBStore();
+ store.setDirectory(new File("target/activemq-data/xahaleveldb"));
+ broker.setPersistenceAdapter(store);
+ }
+
+ // TODO: The following test cases are failing...
+
+ @Override
+ public void testQueuePersistentPreparedAcksNotLostOnRestart() throws Exception {
+ }
+
+ @Override
+ public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
+ }
+
+ @Override
+ public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
+ }
+
+ @Override
+ public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception {
+ }
+}