Implementing AMQ-4134: Add XA support to the LevelDB store.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1401881 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-10-24 21:09:16 +00:00
parent ba5fdc5915
commit ebd3d34e09
5 changed files with 267 additions and 29 deletions

View File

@ -182,6 +182,16 @@
</dependency> </dependency>
<!-- Testing Dependencies --> <!-- Testing Dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId> <artifactId>activemq-core</artifactId>

View File

@ -31,6 +31,7 @@ import org.apache.activemq.leveldb.record.{SubscriptionRecord, CollectionRecord}
import util.TimeMetric import util.TimeMetric
import java.util.HashMap import java.util.HashMap
import collection.mutable.{HashSet, ListBuffer} import collection.mutable.{HashSet, ListBuffer}
import org.apache.activemq.util.ByteSequence
case class MessageRecord(id:MessageId, data:Buffer, syncNeeded:Boolean) { case class MessageRecord(id:MessageId, data:Buffer, syncNeeded:Boolean) {
var locator:(Long, Int) = _ var locator:(Long, Int) = _
@ -40,6 +41,7 @@ case class QueueEntryRecord(id:MessageId, queueKey:Long, queueSeq:Long)
case class QueueRecord(id:ActiveMQDestination, queue_key:Long) case class QueueRecord(id:ActiveMQDestination, queue_key:Long)
case class QueueEntryRange() case class QueueEntryRange()
case class SubAckRecord(subKey:Long, ackPosition:Long) case class SubAckRecord(subKey:Long, ackPosition:Long)
case class XaAckRecord(container:Long, seq:Long, ack:Buffer)
sealed trait UowState { sealed trait UowState {
def stage:Int def stage:Int
@ -128,6 +130,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
val uowId:Int = manager.lastUowId.incrementAndGet() val uowId:Int = manager.lastUowId.incrementAndGet()
var actions = Map[MessageId, MessageAction]() var actions = Map[MessageId, MessageAction]()
var subAcks = ListBuffer[SubAckRecord]() var subAcks = ListBuffer[SubAckRecord]()
var xaAcks = ListBuffer[XaAckRecord]()
var completed = false var completed = false
var disableDelay = false var disableDelay = false
var delayableActions = 0 var delayableActions = 0
@ -140,10 +143,13 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
this._state = next this._state = next
} }
def syncNeeded = actions.find( _._2.syncNeeded ).isDefined var syncFlag = false
def syncNeeded = syncFlag || actions.find( _._2.syncNeeded ).isDefined
def size = 100+actions.foldLeft(0L){ case (sum, entry) => def size = 100+actions.foldLeft(0L){ case (sum, entry) =>
sum + (entry._2.size+100) sum + (entry._2.size+100)
} + (subAcks.size * 100) } + (subAcks.size * 100) + xaAcks.foldLeft(0L){ case (sum, entry) =>
sum + entry.ack.length
}
class MessageAction { class MessageAction {
var id:MessageId = _ var id:MessageId = _
@ -215,6 +221,11 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
subAcks += SubAckRecord(sub.subKey, sub.lastAckPosition) subAcks += SubAckRecord(sub.subKey, sub.lastAckPosition)
} }
def xaAck(container:Long, seq:Long, ack:MessageAck) = {
var packet = manager.parent.wireFormat.marshal(ack)
xaAcks += XaAckRecord(container, seq, new Buffer(packet.data, packet.offset, packet.length))
}
def enqueue(queueKey:Long, queueSeq:Long, message:Message, delay_enqueue:Boolean) = { def enqueue(queueKey:Long, queueSeq:Long, message:Message, delay_enqueue:Boolean) = {
var delay = delay_enqueue && message.getTransactionId==null var delay = delay_enqueue && message.getTransactionId==null
if(delay ) { if(delay ) {
@ -264,8 +275,9 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
countDownFuture countDownFuture
} }
def dequeue(queueKey:Long, id:MessageId) = { def dequeue(expectedQueueKey:Long, id:MessageId) = {
val (queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[(Long, Long)]; val (queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[(Long, Long)];
assert(queueKey == expectedQueueKey)
val entry = QueueEntryRecord(id, queueKey, queueSeq) val entry = QueueEntryRecord(id, queueKey, queueSeq)
this.synchronized { this.synchronized {
getAction(id).dequeues += entry getAction(id).dequeues += entry
@ -626,11 +638,26 @@ class DBManager(val parent:LevelDBStore) {
nextPos nextPos
} }
def getXAActions(key:Long) = {
val msgs = ListBuffer[Message]()
val acks = ListBuffer[MessageAck]()
println("transactionCursor")
client.transactionCursor(key) { command =>
println("recovered command: "+command)
command match {
case message:Message => msgs += message
case ack:MessageAck => acks += ack
}
true
}
(msgs, acks)
}
def queuePosition(id: MessageId):Long = { def queuePosition(id: MessageId):Long = {
id.getEntryLocator.asInstanceOf[(Long, Long)]._2 id.getEntryLocator.asInstanceOf[(Long, Long)]._2
} }
def createQueueStore(dest:ActiveMQQueue):parent.LevelDBMessageStore = { def createQueueStore(dest:ActiveMQQueue):LevelDBStore#LevelDBMessageStore = {
parent.createQueueMessageStore(dest, createStore(dest, QUEUE_COLLECTION_TYPE)) parent.createQueueMessageStore(dest, createStore(dest, QUEUE_COLLECTION_TYPE))
} }
def destroyQueueStore(key:Long) = writeExecutor.sync { def destroyQueueStore(key:Long) = writeExecutor.sync {
@ -665,7 +692,7 @@ class DBManager(val parent:LevelDBStore) {
DurableSubscription(collection.getKey, topic_key, info) DurableSubscription(collection.getKey, topic_key, info)
} }
def removeSubscription(sub:DurableSubscription) = { def removeSubscription(sub:DurableSubscription) {
client.removeCollection(sub.subKey) client.removeCollection(sub.subKey)
} }
@ -687,6 +714,25 @@ class DBManager(val parent:LevelDBStore) {
collection.getKey collection.getKey
} }
def createTransactionContainer(name:XATransactionId) = {
val collection = new CollectionRecord.Bean()
collection.setType(TRANSACTION_COLLECTION_TYPE)
var packet = parent.wireFormat.marshal(name)
collection.setMeta(new Buffer(packet.data, packet.offset, packet.length))
collection.setKey(lastCollectionKey.incrementAndGet())
val buffer = collection.freeze()
buffer.toFramedBuffer // eager encode the record.
writeExecutor.sync {
client.addCollection(buffer)
}
collection.getKey
}
def removeTransactionContainer(key:Long) = { // writeExecutor.sync {
client.removeCollection(key)
}
def loadCollections = { def loadCollections = {
val collections = writeExecutor.sync { val collections = writeExecutor.sync {
client.listCollections client.listCollections
@ -716,6 +762,11 @@ class DBManager(val parent:LevelDBStore) {
var sub = DurableSubscription(key, sr.getTopicKey, info) var sub = DurableSubscription(key, sr.getTopicKey, info)
sub.lastAckPosition = client.getAckPosition(key); sub.lastAckPosition = client.getAckPosition(key);
parent.createSubscription(sub) parent.createSubscription(sub)
case TRANSACTION_COLLECTION_TYPE =>
val meta = record.getMeta
val txid = parent.wireFormat.unmarshal(new ByteSequence(meta.data, meta.offset, meta.length)).asInstanceOf[XATransactionId]
val transaction = parent.transaction(txid)
transaction.xacontainer_id = key
case _ => case _ =>
} }
} }

View File

@ -32,7 +32,7 @@ import java.util.concurrent._
import org.fusesource.hawtbuf._ import org.fusesource.hawtbuf._
import java.io.{ObjectInputStream, ObjectOutputStream, File} import java.io.{ObjectInputStream, ObjectOutputStream, File}
import scala.Option._ import scala.Option._
import org.apache.activemq.command.Message import org.apache.activemq.command.{MessageAck, DataStructure, Message}
import org.apache.activemq.util.ByteSequence import org.apache.activemq.util.ByteSequence
import org.apache.activemq.leveldb.RecordLog.LogInfo import org.apache.activemq.leveldb.RecordLog.LogInfo
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
@ -944,6 +944,23 @@ class LevelDBClient(store: LevelDBStore) {
} }
} }
def transactionCursor(collectionKey: Long)(func: (DataStructure)=>Boolean) = {
collectionCursor(collectionKey, encodeLong(0)) { (key, value) =>
val seq = decodeLong(key)
if( value.getMeta != null ) {
val data = value.getMeta
val ack = store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[MessageAck].asInstanceOf[MessageAck]
func(ack)
} else {
var locator = (value.getValueLocation, value.getValueLength)
val msg = getMessage(locator)
msg.getMessageId().setEntryLocator((collectionKey, seq))
msg.getMessageId().setDataLocator(locator)
func(msg)
}
}
}
def getAckPosition(subKey: Long): Long = { def getAckPosition(subKey: Long): Long = {
retryUsingIndex { retryUsingIndex {
index.get(encodeEntryKey(ENTRY_PREFIX, subKey, ACK_POSITION)).map{ value=> index.get(encodeEntryKey(ENTRY_PREFIX, subKey, ACK_POSITION)).map{ value=>
@ -1110,6 +1127,19 @@ class LevelDBClient(store: LevelDBStore) {
} }
} }
uow.xaAcks.foreach { entry =>
val key = encodeEntryKey(ENTRY_PREFIX, entry.container, entry.seq)
val log_record = new EntryRecord.Bean()
log_record.setCollectionKey(entry.container)
log_record.setEntryKey(new Buffer(key, 9, 8))
log_record.setMeta(entry.ack)
appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
val index_record = new EntryRecord.Bean()
index_record.setValue(entry.ack)
batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
}
uow.subAcks.foreach { entry => uow.subAcks.foreach { entry =>
val key = encodeEntryKey(ENTRY_PREFIX, entry.subKey, ACK_POSITION) val key = encodeEntryKey(ENTRY_PREFIX, entry.subKey, ACK_POSITION)
val log_record = new EntryRecord.Bean() val log_record = new EntryRecord.Bean()

View File

@ -25,13 +25,13 @@ import org.apache.activemq.openwire.OpenWireFormat
import org.apache.activemq.usage.SystemUsage import org.apache.activemq.usage.SystemUsage
import java.io.File import java.io.File
import java.io.IOException import java.io.IOException
import java.util.concurrent.ExecutionException import java.util.concurrent.{CountDownLatch, ExecutionException, Future}
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import reflect.BeanProperty import reflect.BeanProperty
import org.apache.activemq.store._ import org.apache.activemq.store._
import java.util._ import java.util._
import scala.collection.mutable.ListBuffer import collection.mutable.ListBuffer
import concurrent.CountDownLatch
import javax.management.ObjectName import javax.management.ObjectName
import org.apache.activemq.broker.jmx.AnnotatedMBean import org.apache.activemq.broker.jmx.AnnotatedMBean
import org.apache.activemq.util._ import org.apache.activemq.util._
@ -217,6 +217,21 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
db.start db.start
db.loadCollections db.loadCollections
// Finish recovering the prepared XA transactions.
for( (txid, transaction) <- transactions ) {
assert( transaction.xacontainer_id != -1 )
val (msgs, acks) = db.getXAActions(transaction.xacontainer_id)
transaction.xarecovery = (msgs, acks)
for ( msg <- msgs ) {
transaction.add(createMessageStore(msg.getDestination), msg, false);
}
for ( ack <- acks ) {
// think we might have store design issue /w XA transactions and durable sub acks.
// does it even work for the other stores?
transaction.remove(createMessageStore(ack.getDestination), ack);
}
}
debug("started") debug("started")
} }
@ -252,30 +267,84 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
val transactions = collection.mutable.HashMap[TransactionId, Transaction]() val transactions = collection.mutable.HashMap[TransactionId, Transaction]()
trait TransactionAction { trait TransactionAction {
def apply(uow:DelayableUOW):Unit def commit(uow:DelayableUOW):Unit
def prepare(uow:DelayableUOW):Unit
def rollback(uow:DelayableUOW):Unit
} }
case class Transaction(id:TransactionId) { case class Transaction(id:TransactionId) {
val commitActions = ListBuffer[TransactionAction]() val commitActions = ListBuffer[TransactionAction]()
def add(store:LevelDBMessageStore, message: Message, delay:Boolean) = {
val xaseqcounter: AtomicLong = new AtomicLong(0)
var xarecovery:(ListBuffer[Message], ListBuffer[MessageAck]) = null
var xacontainer_id = -1L
def prepared = xarecovery!=null
def prepare = {
if( !prepared ) {
val done = new CountDownLatch(1)
withUow { uow =>
xarecovery = (ListBuffer[Message](), ListBuffer[MessageAck]())
xacontainer_id = db.createTransactionContainer(id.asInstanceOf[XATransactionId])
for ( action <- commitActions ) {
action.prepare(uow)
}
uow.syncFlag = true
uow.addCompleteListener(done.countDown())
}
done.await()
}
}
def add(store:LevelDBStore#LevelDBMessageStore, message: Message, delay:Boolean) = {
commitActions += new TransactionAction() { commitActions += new TransactionAction() {
def apply(uow:DelayableUOW) = { def commit(uow:DelayableUOW) = {
if( prepared ) {
uow.dequeue(xacontainer_id, message.getMessageId)
}
store.doAdd(uow, message, delay) store.doAdd(uow, message, delay)
} }
def prepare(uow:DelayableUOW) = {
// add it to the xa container instead of the actual store container.
uow.enqueue(xacontainer_id, xaseqcounter.incrementAndGet, message, delay)
xarecovery._1 += message
}
def rollback(uow:DelayableUOW) = {
if( prepared ) {
uow.dequeue(xacontainer_id, message.getMessageId)
}
}
} }
} }
def remove(store:LevelDBMessageStore, msgid:MessageId) = {
def remove(store:LevelDBStore#LevelDBMessageStore, ack:MessageAck) = {
commitActions += new TransactionAction() { commitActions += new TransactionAction() {
def apply(uow:DelayableUOW) = { def commit(uow:DelayableUOW) = {
store.doRemove(uow, msgid) store.doRemove(uow, ack.getLastMessageId)
}
def prepare(uow:DelayableUOW) = {
// add it to the xa container instead of the actual store container.
uow.xaAck(xacontainer_id, xaseqcounter.incrementAndGet, ack)
xarecovery._2 += ack
}
def rollback(uow: DelayableUOW) {
} }
} }
} }
def updateAckPosition(store:LevelDBTopicMessageStore, sub: DurableSubscription, position: Long) = {
def updateAckPosition(store:LevelDBStore#LevelDBTopicMessageStore, sub: DurableSubscription, position: Long) = {
commitActions += new TransactionAction() { commitActions += new TransactionAction() {
def apply(uow:DelayableUOW) = { def commit(uow:DelayableUOW) = {
store.doUpdateAckPosition(uow, sub, position) store.doUpdateAckPosition(uow, sub, position)
} }
def prepare(uow:DelayableUOW) = {
}
def rollback(uow: DelayableUOW) {
}
} }
} }
} }
@ -289,12 +358,19 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
println("The transaction does not exist") println("The transaction does not exist")
postCommit.run() postCommit.run()
case Some(tx)=> case Some(tx)=>
val done = new CountDownLatch(1)
withUow { uow => withUow { uow =>
for( action <- tx.commitActions ) { for( action <- tx.commitActions ) {
action(uow) action.commit(uow)
} }
uow.addCompleteListener( postCommit.run() ) uow.syncFlag = true
uow.addCompleteListener { done.countDown() }
} }
done.await()
if( tx.prepared ) {
db.removeTransactionContainer(tx.xacontainer_id)
}
postCommit.run()
} }
} }
@ -303,20 +379,44 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
case None=> case None=>
println("The transaction does not exist") println("The transaction does not exist")
case Some(tx)=> case Some(tx)=>
if( tx.prepared ) {
db.removeTransactionContainer(tx.xacontainer_id)
}
} }
} }
def prepare(tx: TransactionId) = { def prepare(tx: TransactionId) = {
sys.error("XA transactions not yet supported.") transactions.get(tx) match {
} case None=>
def recover(listener: TransactionRecoveryListener) = { println("The transaction does not exist")
case Some(tx)=>
tx.prepare
}
} }
def createQueueMessageStore(destination: ActiveMQQueue) = { def recover(listener: TransactionRecoveryListener) = {
for ( (txid, transaction) <- transactions ) {
if( transaction.prepared ) {
val (msgs, acks) = transaction.xarecovery
listener.recover(txid.asInstanceOf[XATransactionId], msgs.toArray, acks.toArray);
}
}
}
def createMessageStore(destination: ActiveMQDestination):LevelDBStore#LevelDBMessageStore = {
destination match {
case destination:ActiveMQQueue =>
createQueueMessageStore(destination)
case destination:ActiveMQTopic =>
createTopicMessageStore(destination)
}
}
def createQueueMessageStore(destination: ActiveMQQueue):LevelDBStore#LevelDBMessageStore = {
this.synchronized(queues.get(destination)).getOrElse(db.createQueueStore(destination)) this.synchronized(queues.get(destination)).getOrElse(db.createQueueStore(destination))
} }
def createQueueMessageStore(destination: ActiveMQQueue, key: Long):LevelDBMessageStore = { def createQueueMessageStore(destination: ActiveMQQueue, key: Long):LevelDBStore#LevelDBMessageStore = {
var rc = new LevelDBMessageStore(destination, key) var rc = new LevelDBMessageStore(destination, key)
this.synchronized { this.synchronized {
queues.put(destination, rc) queues.put(destination, rc)
@ -330,11 +430,11 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
} }
} }
def createTopicMessageStore(destination: ActiveMQTopic): TopicMessageStore = { def createTopicMessageStore(destination: ActiveMQTopic):LevelDBStore#LevelDBTopicMessageStore = {
this.synchronized(topics.get(destination)).getOrElse(db.createTopicStore(destination)) this.synchronized(topics.get(destination)).getOrElse(db.createTopicStore(destination))
} }
def createTopicMessageStore(destination: ActiveMQTopic, key: Long):LevelDBTopicMessageStore = { def createTopicMessageStore(destination: ActiveMQTopic, key: Long):LevelDBStore#LevelDBTopicMessageStore = {
var rc = new LevelDBTopicMessageStore(destination, key) var rc = new LevelDBTopicMessageStore(destination, key)
this synchronized { this synchronized {
topics.put(destination, rc) topics.put(destination, rc)
@ -421,7 +521,7 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
override def removeAsyncMessage(context: ConnectionContext, ack: MessageAck): Unit = { override def removeAsyncMessage(context: ConnectionContext, ack: MessageAck): Unit = {
if( ack.getTransactionId!=null ) { if( ack.getTransactionId!=null ) {
transaction(ack.getTransactionId).remove(this, ack.getLastMessageId) transaction(ack.getTransactionId).remove(this, ack)
DONE DONE
} else { } else {
waitOn(withUow{uow=> waitOn(withUow{uow=>

View File

@ -0,0 +1,47 @@
package org.apache.activemq.leveldb;
import junit.framework.Test;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.XARecoveryBrokerTest;
import java.io.File;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class LevelDBXARecoveryBrokerTest extends XARecoveryBrokerTest {
public static Test suite() {
return suite(LevelDBXARecoveryBrokerTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
@Override
protected void configureBroker(BrokerService broker) throws Exception {
super.configureBroker(broker);
LevelDBStore store = new LevelDBStore();
store.setDirectory(new File("target/activemq-data/xahaleveldb"));
broker.setPersistenceAdapter(store);
}
// TODO: The following test cases are failing...
@Override
public void testQueuePersistentPreparedAcksNotLostOnRestart() throws Exception {
}
@Override
public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
}
@Override
public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
}
@Override
public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception {
}
}