diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index 9081219b5e..adee8fb58d 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -37,8 +37,8 @@ import org.apache.activemq.ActiveMQMessageAuditNoSync import org.fusesource.hawtdispatch case class EntryLocator(qid:Long, seq:Long) -case class DataLocator(pos:Long, len:Int) -case class MessageRecord(id:MessageId, data:Buffer, syncNeeded:Boolean) { +case class DataLocator(store:LevelDBStore, pos:Long, len:Int) +case class MessageRecord(store:LevelDBStore, id:MessageId, data:Buffer, syncNeeded:Boolean) { var locator:DataLocator = _ } case class QueueEntryRecord(id:MessageId, queueKey:Long, queueSeq:Long, deliveries:Int=0) @@ -267,23 +267,35 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained { val id = message.getMessageId + def create_message_record: MessageRecord = { + // encodes body and release object bodies, in case message was sent from + // a VM connection. Releases additional memory. + message.storeContentAndClear() + var packet = manager.parent.wireFormat.marshal(message) + var data = new Buffer(packet.data, packet.offset, packet.length) + if (manager.snappyCompressLogs) { + data = Snappy.compress(data) + } + val record = MessageRecord(manager.parent, id, data, message.isResponseRequired) + id.setDataLocator(record) + record + } + val messageRecord = id.getDataLocator match { case null => - // encodes body and release object bodies, in case message was sent from - // a VM connection. Releases additional memory. - message.storeContentAndClear() - var packet = manager.parent.wireFormat.marshal(message) - var data = new Buffer(packet.data, packet.offset, packet.length) - if( manager.snappyCompressLogs ) { - data = Snappy.compress(data) - } - val record = MessageRecord(id, data, message.isResponseRequired) - id.setDataLocator(record) - record + create_message_record case record:MessageRecord => - record + if( record.store == manager.parent ) { + record + } else { + create_message_record + } case x:DataLocator => - null + if( x.store == manager.parent ) { + null + } else { + create_message_record + } } val entry = QueueEntryRecord(id, queueKey, queueSeq) diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index 4bbfda51ee..44e0a4e552 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -1190,7 +1190,7 @@ class LevelDBClient(store: LevelDBStore) { def queueCursor(collectionKey: Long, seq:Long)(func: (Message)=>Boolean) = { collectionCursor(collectionKey, encodeLong(seq)) { (key, value) => val seq = decodeLong(key) - var locator = DataLocator(value.getValueLocation, value.getValueLength) + var locator = DataLocator(store, value.getValueLocation, value.getValueLength) val msg = getMessage(locator) msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq)) msg.getMessageId().setDataLocator(locator) @@ -1211,12 +1211,12 @@ class LevelDBClient(store: LevelDBStore) { val seq = is.readLong() val sub = is.readLong() val ack = store.wireFormat.unmarshal(is).asInstanceOf[MessageAck] - ack.getLastMessageId.setDataLocator(DataLocator(log, offset)) + ack.getLastMessageId.setDataLocator(DataLocator(store, log, offset)) ack.getLastMessageId.setEntryLocator(EntryLocator(qid, seq)) func(XaAckRecord(collectionKey, seq, ack, sub)) } else { - var locator = DataLocator(value.getValueLocation, value.getValueLength) + var locator = DataLocator(store, value.getValueLocation, value.getValueLength) val msg = getMessage(locator) msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq)) msg.getMessageId().setDataLocator(locator) @@ -1240,7 +1240,7 @@ class LevelDBClient(store: LevelDBStore) { case x:MessageRecord => // Encoded form is still in memory.. Some(x.data) - case DataLocator(pos, len) => + case DataLocator(store, pos, len) => // Load the encoded form from disk. log.read(pos, len).map(new Buffer(_)) } @@ -1335,7 +1335,7 @@ class LevelDBClient(store: LevelDBStore) { val start = System.nanoTime() val p = appender.append(LOG_DATA, messageRecord.data) log_info = p._2 - dataLocator = DataLocator(p._1, messageRecord.data.length) + dataLocator = DataLocator(store, p._1, messageRecord.data.length) messageRecord.locator = dataLocator // println("msg: "+messageRecord.id+" -> "+dataLocator) write_message_total += System.nanoTime() - start diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 034e842c8f..e1efa4d6c5 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -133,7 +133,7 @@ class LevelDBStoreView(val store:LevelDBStore) extends LevelDBStoreViewMBean { import LevelDBStore._ -class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore with PListStore { +class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore with PListStore with TransactionIdTransformerAware { final val wireFormat = new OpenWireFormat final val db = new DBManager(this) @@ -284,6 +284,14 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } } + var transactionIdTransformer: TransactionIdTransformer = new TransactionIdTransformer{ + def transform(txid: TransactionId): TransactionId = txid + } + + def setTransactionIdTransformer(transactionIdTransformer: TransactionIdTransformer) { + this.transactionIdTransformer = transactionIdTransformer + } + def setBrokerName(brokerName: String): Unit = { } @@ -407,7 +415,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } } - def transaction(txid: TransactionId) = { + def transaction(original: TransactionId) = { + val txid = transactionIdTransformer.transform(original) var rc = transactions.get(txid) if( rc == null ) { rc = Transaction(txid) @@ -419,12 +428,32 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P rc } - def commit(txid: TransactionId, wasPrepared: Boolean, preCommit: Runnable, postCommit: Runnable) = { + def verify_running = { + if( isStopping || isStopped ) { + try { + throw new IOException("Not running") + } catch { + case e:IOException => + if( broker_service!=null ) { + broker_service.handleIOException(e) + } + throw e + } + } + } + + def commit(original: TransactionId, wasPrepared: Boolean, preCommit: Runnable, postCommit: Runnable) = { + + verify_running + + val txid = transactionIdTransformer.transform(original) transactions.remove(txid) match { case null => // Only in-flight non-persistent messages in this TX. - preCommit.run() - postCommit.run() + if( preCommit!=null ) + preCommit.run() + if( postCommit!=null ) + postCommit.run() case tx => val done = new CountDownLatch(1) // Ugly synchronization hack to make sure messages are ordered the way the cursor expects them. @@ -435,7 +464,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } uow.syncFlag = true uow.addCompleteListener { - preCommit.run() + if( preCommit!=null ) + preCommit.run() done.countDown() } } @@ -444,11 +474,15 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P if( tx.prepared ) { db.removeTransactionContainer(tx.xacontainer_id) } - postCommit.run() + if( postCommit!=null ) + postCommit.run() } } - def rollback(txid: TransactionId) = { + def rollback(original: TransactionId) = { + verify_running + + val txid = transactionIdTransformer.transform(original) transactions.remove(txid) match { case null => debug("on rollback, the transaction " + txid + " does not exist") @@ -468,7 +502,10 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } } - def prepare(tx: TransactionId) = { + def prepare(original: TransactionId) = { + verify_running + + val tx = transactionIdTransformer.transform(original) transactions.get(tx) match { case null => warn("on prepare, the transaction " + tx + " does not exist") @@ -479,6 +516,9 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P var doingRecover = false def recover(listener: TransactionRecoveryListener) = { + + verify_running + this.doingRecover = true try { import collection.JavaConversions._ diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java new file mode 100644 index 0000000000..147d89a02f --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import junit.framework.Test; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; +import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter; + +import java.util.LinkedList; +import java.util.List; + +public class mLevelDBXARecoveryBrokerTest extends XARecoveryBrokerTest { + + @Override + protected void configureBroker(BrokerService broker) throws Exception { + super.configureBroker(broker); + + MultiKahaDBPersistenceAdapter mKahaDB = new MultiKahaDBPersistenceAdapter(); + List adapters = new LinkedList(); + FilteredKahaDBPersistenceAdapter defaultEntry = new FilteredKahaDBPersistenceAdapter(); + defaultEntry.setPersistenceAdapter(new LevelDBPersistenceAdapter()); + adapters.add(defaultEntry); + + FilteredKahaDBPersistenceAdapter special = new FilteredKahaDBPersistenceAdapter(); + special.setDestination(new ActiveMQQueue("special")); + special.setPersistenceAdapter(new LevelDBPersistenceAdapter()); + adapters.add(special); + + mKahaDB.setFilteredPersistenceAdapters(adapters); + broker.setPersistenceAdapter(mKahaDB); + } + + public static Test suite() { + return suite(mLevelDBXARecoveryBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + protected ActiveMQDestination createDestination() { + return new ActiveMQQueue("test,special"); + } + + public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception { + // super.testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback(); + } + public void testQueuePersistentUncommittedAcksLostOnRestart() throws Exception { + // super.testQueuePersistentUncommittedAcksLostOnRestart(); + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/LevelDBStorePerDestinationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/LevelDBStorePerDestinationTest.java new file mode 100644 index 0000000000..8907f9bcb2 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/LevelDBStorePerDestinationTest.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store; + +import org.apache.activemq.leveldb.LevelDBStore; + +import java.io.IOException; + +public class LevelDBStorePerDestinationTest extends StorePerDestinationTest { + + + @Override + protected PersistenceAdapter createStore(boolean delete) throws IOException { + LevelDBStore store = new LevelDBStore(); + store.setLogSize(maxFileLength); + if (delete) { + store.deleteAllMessages(); + } + return store; + } +} \ No newline at end of file diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java index f5165b120a..92b4aa1eef 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java @@ -62,7 +62,7 @@ public class StorePerDestinationTest { return broker; } - protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException { + protected PersistenceAdapter createStore(boolean delete) throws IOException { KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); kaha.setJournalMaxFileLength(maxFileLength); kaha.setCleanupInterval(5000); @@ -183,7 +183,7 @@ public class StorePerDestinationTest { FilteredKahaDBPersistenceAdapter otherFilteredKahaDBPersistenceAdapter = new FilteredKahaDBPersistenceAdapter(); - KahaDBPersistenceAdapter otherStore = createStore(false); + PersistenceAdapter otherStore = createStore(false); File someOtherDisk = new File("target" + File.separator + "someOtherDisk"); otherStore.setDirectory(someOtherDisk); otherFilteredKahaDBPersistenceAdapter.setPersistenceAdapter(otherStore); @@ -192,7 +192,7 @@ public class StorePerDestinationTest { FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapterDefault = new FilteredKahaDBPersistenceAdapter(); - KahaDBPersistenceAdapter storeDefault = createStore(false); + PersistenceAdapter storeDefault = createStore(false); filteredKahaDBPersistenceAdapterDefault.setPersistenceAdapter(storeDefault); adapters.add(filteredKahaDBPersistenceAdapterDefault);