diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 43daff24d5..75a68c7883 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -21,7 +21,6 @@ import java.sql.SQLException; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.LinkedList; -import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -73,7 +72,7 @@ public class JDBCMessageStore extends AbstractMessageStore { protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1); final Set recoveredAdditions = new LinkedHashSet(); protected ActiveMQMessageAudit audit; - protected final List pendingAdditions = new LinkedList(); + protected final LinkedList pendingAdditions = new LinkedList(); public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException { super(destination); @@ -131,7 +130,8 @@ public class JDBCMessageStore extends AbstractMessageStore { pendingAdditions.add(sequence); c.onCompletion(new Runnable() { public void run() { - // message added to db + // jdbc close or jms commit - while futureOrSequenceLong==null ordered + // work will remain pending on the Queue message.getMessageId().setFutureOrSequenceLong(sequence); message.getMessageId().setEntryLocator(sequence); } @@ -341,6 +341,9 @@ public class JDBCMessageStore extends AbstractMessageStore { } } } + if (LOG.isTraceEnabled()) { + LOG.trace(this + " recoverNext lastRecovered:" + lastRecoveredSequenceId.get() + ", minPending:" + minPendingSequeunceId()); + } adapter.doRecoverNextMessages(c, destination, minPendingSequeunceId(), lastRecoveredSequenceId.get(), lastRecoveredPriority.get(), maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() { @@ -376,7 +379,7 @@ public class JDBCMessageStore extends AbstractMessageStore { */ public void resetBatching() { if (LOG.isTraceEnabled()) { - LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get()); + LOG.trace(this + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get()); } lastRecoveredSequenceId.set(-1); lastRecoveredPriority.set(Byte.MAX_VALUE - 1); @@ -394,7 +397,7 @@ public class JDBCMessageStore extends AbstractMessageStore { lastRecoveredPriority.set(Byte.MAX_VALUE -1); } if (LOG.isTraceEnabled()) { - LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get() + LOG.trace(this + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get() + ", priority: " + lastRecoveredPriority.get()); } } @@ -403,4 +406,9 @@ public class JDBCMessageStore extends AbstractMessageStore { public void setPrioritizedMessages(boolean prioritizedMessages) { super.setPrioritizedMessages(prioritizedMessages); } + + @Override + public String toString() { + return destination.getPhysicalName() + ",pendingSize:" + pendingAdditions.size(); + } } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index 09c1378d69..b0051ccc57 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -104,7 +104,6 @@ class CountDownFuture[T <: AnyRef]() extends ListenableFuture[T] { var value:T = _ var error:Throwable = _ var listener:Runnable = _ - var id:MessageId = _ def cancel(mayInterruptIfRunning: Boolean) = false def isCancelled = false @@ -116,9 +115,6 @@ class CountDownFuture[T <: AnyRef]() extends ListenableFuture[T] { def set(v:T) = { value = v - if (id != null) { - id.setFutureOrSequenceLong(id.getEntryLocator.asInstanceOf[EntryLocator].seq) - } latch.countDown() fireListener } @@ -330,14 +326,6 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained { val entry = QueueEntryRecord(id, queueKey, queueSeq) assert(id.getEntryLocator == null) id.setEntryLocator(EntryLocator(queueKey, queueSeq)) - if (message.getTransactionId!=null) { - // why does future not get set in tx? - id.setFutureOrSequenceLong(queueSeq) - } else { - id.setFutureOrSequenceLong(countDownFuture) - message.setRecievedByDFBridge(true) - countDownFuture.id = id - } val a = this.synchronized { if( !delay ) @@ -741,10 +729,10 @@ class DBManager(val parent:LevelDBStore) { client.collectionIsEmpty(key) } - def cursorMessages(preparedAcks:java.util.HashSet[MessageId], key:Long, listener:MessageRecoveryListener, startPos:Long, max:Long=Long.MaxValue) = { + def cursorMessages(preparedAcks:java.util.HashSet[MessageId], key:Long, listener:MessageRecoveryListener, startPos:Long, endPos:Long=Long.MaxValue, max:Long=Long.MaxValue) = { var lastmsgid:MessageId = null var count = 0L - client.queueCursor(key, startPos) { msg => + client.queueCursor(key, startPos, endPos) { msg => if( !preparedAcks.contains(msg.getMessageId) && listener.recoverMessage(msg) ) { lastmsgid = msg.getMessageId count += 1 diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index 3d6178bcea..64bbcee11a 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -1255,8 +1255,8 @@ class LevelDBClient(store: LevelDBStore) { return rc } - def queueCursor(collectionKey: Long, seq:Long)(func: (Message)=>Boolean) = { - collectionCursor(collectionKey, encodeLong(seq)) { (key, value) => + def queueCursor(collectionKey: Long, seq:Long, endSeq:Long)(func: (Message)=>Boolean) = { + collectionCursor(collectionKey, encodeLong(seq), encodeLong(endSeq)) { (key, value) => val seq = decodeLong(key) var locator = DataLocator(store, value.getValueLocation, value.getValueLength) val msg = getMessage(locator) @@ -1273,7 +1273,7 @@ class LevelDBClient(store: LevelDBStore) { } def transactionCursor(collectionKey: Long)(func: (AnyRef)=>Boolean) = { - collectionCursor(collectionKey, encodeLong(0)) { (key, value) => + collectionCursor(collectionKey, encodeLong(0), encodeLong(Long.MaxValue)) { (key, value) => val seq = decodeLong(key) if( value.getMeta != null ) { @@ -1336,12 +1336,12 @@ class LevelDBClient(store: LevelDBStore) { store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[Message] } - def collectionCursor(collectionKey: Long, cursorPosition:Buffer)(func: (Buffer, EntryRecord.Buffer)=>Boolean) = { + def collectionCursor(collectionKey: Long, cursorPosition:Buffer, endCursorPosition:Buffer)(func: (Buffer, EntryRecord.Buffer)=>Boolean) = { val ro = new ReadOptions ro.fillCache(true) ro.verifyChecksums(verifyChecksums) val start = encodeEntryKey(ENTRY_PREFIX, collectionKey, cursorPosition) - val end = encodeLongKey(ENTRY_PREFIX, collectionKey+1) + val end = encodeEntryKey(ENTRY_PREFIX, collectionKey, endCursorPosition) might_fail_using_index { index.cursorRange(start, end, ro) { case (key, value) => func(key.buffer.moveHead(9), EntryRecord.FACTORY.parseUnframed(value)) diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 451bc04643..52a785a9f2 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -35,7 +35,8 @@ import org.apache.activemq.leveldb.util.Log import org.apache.activemq.store.PList.PListIterator import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream} import org.fusesource.hawtdispatch; -import org.apache.activemq.broker.scheduler.JobSchedulerStore; +import org.apache.activemq.broker.scheduler.JobSchedulerStore +import org.apache.activemq.store.IndexListener.MessageContext object LevelDBStore extends Log { val DEFAULT_DIRECTORY = new File("LevelDB"); @@ -245,7 +246,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P val (msgs, acks) = db.getXAActions(transaction.xacontainer_id) transaction.xarecovery = (msgs, acks.map(_.ack)) for ( msg <- msgs ) { - transaction.add(createMessageStore(msg.getDestination), new IndexListener.MessageContext(null, msg, null), false); + transaction.add(createMessageStore(msg.getDestination), null, msg, false); } for ( record <- acks ) { var ack = record.ack @@ -348,27 +349,27 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } } - def add(store:LevelDBStore#LevelDBMessageStore, messageContext:IndexListener.MessageContext, delay:Boolean) = { + def add(store:LevelDBStore#LevelDBMessageStore, context: ConnectionContext, message: Message, delay:Boolean) = { commitActions += new TransactionAction() { def commit(uow:DelayableUOW) = { if( prepared ) { - uow.dequeue(xacontainer_id, messageContext.message.getMessageId) + uow.dequeue(xacontainer_id, message.getMessageId) } - var copy = messageContext.message.getMessageId.copy() + var copy = message.getMessageId.copy() copy.setEntryLocator(null) - messageContext.message.setMessageId(copy) - store.doAdd(uow, messageContext, delay) + message.setMessageId(copy) + store.doAdd(uow, context, message, delay) } def prepare(uow:DelayableUOW) = { // add it to the xa container instead of the actual store container. - uow.enqueue(xacontainer_id, xaseqcounter.incrementAndGet, messageContext.message, delay) - xarecovery._1 += messageContext.message + uow.enqueue(xacontainer_id, xaseqcounter.incrementAndGet, message, delay) + xarecovery._1 += message } def rollback(uow:DelayableUOW) = { if( prepared ) { - uow.dequeue(xacontainer_id, messageContext.message.getMessageId) + uow.dequeue(xacontainer_id, message.getMessageId) } } @@ -671,23 +672,29 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P val lastSeq: AtomicLong = new AtomicLong(0) protected var cursorPosition: Long = 0 val preparedAcks = new HashSet[MessageId]() - + val pendingCursorAdds = new LinkedList[Long]() lastSeq.set(db.getLastQueueEntrySeq(key)) def cursorResetPosition = 0L - def doAdd(uow: DelayableUOW, messageContext:IndexListener.MessageContext, delay:Boolean): CountDownFuture[AnyRef] = { + def doAdd(uow: DelayableUOW, context: ConnectionContext, message: Message, delay:Boolean): CountDownFuture[AnyRef] = { check_running - val seq = lastSeq.incrementAndGet() - messageContext.message.incrementReferenceCount() + message.incrementReferenceCount() uow.addCompleteListener({ - messageContext.message.decrementReferenceCount() + message.decrementReferenceCount() }) - val future = uow.enqueue(key, seq, messageContext.message, delay) - if (indexListener != null) { - indexListener.onAdd(messageContext) + val sequence = lastSeq.synchronized { + val seq = lastSeq.incrementAndGet() + message.getMessageId.setFutureOrSequenceLong(seq); + if (indexListener != null) { + pendingCursorAdds.synchronized { pendingCursorAdds.add(seq) } + indexListener.onAdd(new MessageContext(context, message, new Runnable { + def run(): Unit = pendingCursorAdds.synchronized { pendingCursorAdds.remove(seq) } + })) + } + seq } - future + uow.enqueue(key, sequence, message, delay) } override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false) @@ -695,11 +702,11 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P check_running message.getMessageId.setEntryLocator(null) if( message.getTransactionId!=null ) { - transaction(message.getTransactionId).add(this, new IndexListener.MessageContext(context, message, null), delay) + transaction(message.getTransactionId).add(this, context, message, delay) DONE } else { withUow { uow=> - doAdd(uow, new IndexListener.MessageContext(context, message, null), delay) + doAdd(uow, context, message, delay) } } } @@ -759,9 +766,13 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P return db.collectionIsEmpty(key) } + def getCursorPendingLimit: Long = { + pendingCursorAdds.synchronized { Option(pendingCursorAdds.peek).getOrElse(Long.MaxValue) } + } + def recover(listener: MessageRecoveryListener): Unit = { check_running - cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorResetPosition) + cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorResetPosition, getCursorPendingLimit) } def resetBatching: Unit = { @@ -770,11 +781,11 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = { check_running - cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorPosition, maxReturned) + cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorPosition, getCursorPendingLimit, maxReturned) } override def setBatch(id: MessageId): Unit = { - cursorPosition = db.queuePosition(id) + 1 + cursorPosition = Math.min(getCursorPendingLimit, db.queuePosition(id)) + 1 } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java index 131f807c7b..cfd6534680 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java @@ -89,7 +89,9 @@ public class AMQ5266SingleDestTest { @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}") public static Iterable parameters() { return Arrays.asList(new Object[][]{ - {1000, 80, 80, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, + {1000, 40, 40, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, + {1000, 40, 40, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, + {1000, 40, 40, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.JDBC, false}, }); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java index 300bec131c..f7409dd619 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java @@ -37,6 +37,7 @@ import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.TestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.region.RegionBroker; @@ -66,7 +67,6 @@ public class AMQ5266StarvedConsumerTest { static Logger LOG = LoggerFactory.getLogger(AMQ5266StarvedConsumerTest.class); String activemqURL; BrokerService brokerService; - private EmbeddedDataSource dataSource; public int messageSize = 1000; @@ -86,16 +86,22 @@ public class AMQ5266StarvedConsumerTest { public boolean useCache = true; @Parameterized.Parameter(5) - public boolean useDefaultStore = false; + public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB; @Parameterized.Parameter(6) public boolean optimizeDispatch = false; private AtomicBoolean didNotReceive = new AtomicBoolean(false); - @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}") + @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}") public static Iterable parameters() { return Arrays.asList(new Object[][]{ - {1000, 40, 5, 1024*1024, false, false, true}, + {1000, 40, 5, 1024*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true}, + {1000, 40, 5, 1024*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true}, + {1000, 40, 5, 1024*1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true}, + + {500, 20, 20, 1024*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true}, + {500, 20, 20, 1024*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true}, + {500, 20, 20, 1024*1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true}, }); } @@ -104,21 +110,7 @@ public class AMQ5266StarvedConsumerTest { @Before public void startBroker() throws Exception { brokerService = new BrokerService(); - - dataSource = new EmbeddedDataSource(); - dataSource.setDatabaseName("target/derbyDb"); - dataSource.setCreateDatabase("create"); - - JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); - jdbcPersistenceAdapter.setDataSource(dataSource); - jdbcPersistenceAdapter.setUseLock(false); - - if (!useDefaultStore) { - brokerService.setPersistenceAdapter(jdbcPersistenceAdapter); - } else { - KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); - kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true); - } + TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice); brokerService.setDeleteAllMessagesOnStartup(true); brokerService.setUseJmx(false); brokerService.setAdvisorySupport(false); @@ -149,10 +141,6 @@ public class AMQ5266StarvedConsumerTest { if (brokerService != null) { brokerService.stop(); } - try { - dataSource.setShutdownDatabase("shutdown"); - dataSource.getConnection(); - } catch (Exception ignored) {} } CyclicBarrier globalProducerHalt = new CyclicBarrier(publisherThreadCount, new Runnable() { @@ -216,9 +204,6 @@ public class AMQ5266StarvedConsumerTest { try { int secs = (int) (endWait - System.currentTimeMillis()) / 1000; LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs"); - if (!useDefaultStore) { - DefaultJDBCAdapter.dumpTables(dataSource.getConnection()); - } Thread.sleep(10000); } catch (Exception e) { } @@ -228,12 +213,6 @@ public class AMQ5266StarvedConsumerTest { consumer.shutdown(); - TimeUnit.SECONDS.sleep(2); - LOG.info("DB Contents START"); - if (!useDefaultStore) { - DefaultJDBCAdapter.dumpTables(dataSource.getConnection()); - } - LOG.info("DB Contents END"); LOG.info("Consumer Stats:"); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java index efccefa9d7..e180746939 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java @@ -35,15 +35,12 @@ import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.TestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; -import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.derby.jdbc.EmbeddedDataSource; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -65,7 +62,6 @@ public class AMQ5266Test { static Logger LOG = LoggerFactory.getLogger(AMQ5266Test.class); String activemqURL = "tcp://localhost:61617"; BrokerService brokerService; - private EmbeddedDataSource dataSource; public int messageSize = 1000; @@ -85,28 +81,34 @@ public class AMQ5266Test { public boolean useCache = true; @Parameterized.Parameter(5) - public boolean useDefaultStore = false; + public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB; @Parameterized.Parameter(6) public boolean optimizeDispatch = false; - @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}") + @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}") public static Iterable parameters() { return Arrays.asList(new Object[][]{ - // jdbc - {1, 1, 1, 50*1024, false, false, true}, - {1000, 20, 5, 50*1024, true, false, false}, - {100, 20, 5, 50*1024, false, false, false}, - {1000, 5, 20, 50*1024, true, false, false}, - {1000, 20, 20, 1024*1024, true, false, false}, + {1, 1, 1, 50*1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true}, + {1000, 20, 5, 50*1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false}, + {100, 20, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.JDBC, false}, + {1000, 5, 20, 50*1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false}, + {1000, 20, 20, 1024*1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false}, + + {1, 1, 1, 50*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true}, + {100, 5, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, false}, + {1000, 20, 5, 50*1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, + {100, 20, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, false}, + {1000, 5, 20, 50*1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, + {1000, 20, 20, 1024*1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, + + {1, 1, 1, 50*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true}, + {100, 5, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, false}, + {1000, 20, 5, 50*1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, + {100, 20, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, false}, + {1000, 5, 20, 50*1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, + {1000, 20, 20, 1024*1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, - // default store - {1, 1, 1, 50*1024, false, true, true}, - {100, 5, 5, 50*1024, false, true, false}, - {1000, 20, 5, 50*1024, true, true, false}, - {100, 20, 5, 50*1024, false, true, false}, - {1000, 5, 20, 50*1024, true, true, false}, - {1000, 20, 20, 1024*1024, true, true, false}, }); } @@ -115,25 +117,10 @@ public class AMQ5266Test { @Before public void startBroker() throws Exception { brokerService = new BrokerService(); - - dataSource = new EmbeddedDataSource(); - dataSource.setDatabaseName("target/derbyDb"); - dataSource.setCreateDatabase("create"); - - JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); - jdbcPersistenceAdapter.setDataSource(dataSource); - jdbcPersistenceAdapter.setUseLock(false); - - if (!useDefaultStore) { - brokerService.setPersistenceAdapter(jdbcPersistenceAdapter); - } else { - KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); - kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true); - } + TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice); brokerService.setDeleteAllMessagesOnStartup(true); brokerService.setUseJmx(false); - PolicyMap policyMap = new PolicyMap(); PolicyEntry defaultEntry = new PolicyEntry(); defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract! @@ -159,10 +146,6 @@ public class AMQ5266Test { if (brokerService != null) { brokerService.stop(); } - try { - dataSource.setShutdownDatabase("shutdown"); - dataSource.getConnection(); - } catch (Exception ignored) {} } @Test @@ -211,9 +194,6 @@ public class AMQ5266Test { try { int secs = (int) (endWait - System.currentTimeMillis()) / 1000; LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs"); - if (!useDefaultStore) { - DefaultJDBCAdapter.dumpTables(dataSource.getConnection()); - } Thread.sleep(10000); } catch (Exception e) { } @@ -223,13 +203,6 @@ public class AMQ5266Test { consumer.shutdown(); - TimeUnit.SECONDS.sleep(2); - LOG.info("DB Contents START"); - if (!useDefaultStore) { - DefaultJDBCAdapter.dumpTables(dataSource.getConnection()); - } - LOG.info("DB Contents END"); - LOG.info("Consumer Stats:"); for (Map.Entry> entry : consumer.getIDs().entrySet()) {