mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-5266 tidy up leveldb impl with additional scenario tests
This commit is contained in:
parent
642cc43216
commit
3042797b41
|
@ -21,7 +21,6 @@ import java.sql.SQLException;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
@ -73,7 +72,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
|
protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
|
||||||
final Set<Long> recoveredAdditions = new LinkedHashSet<Long>();
|
final Set<Long> recoveredAdditions = new LinkedHashSet<Long>();
|
||||||
protected ActiveMQMessageAudit audit;
|
protected ActiveMQMessageAudit audit;
|
||||||
protected final List<Long> pendingAdditions = new LinkedList<Long>();
|
protected final LinkedList<Long> pendingAdditions = new LinkedList<Long>();
|
||||||
|
|
||||||
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException {
|
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException {
|
||||||
super(destination);
|
super(destination);
|
||||||
|
@ -131,7 +130,8 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
pendingAdditions.add(sequence);
|
pendingAdditions.add(sequence);
|
||||||
c.onCompletion(new Runnable() {
|
c.onCompletion(new Runnable() {
|
||||||
public void run() {
|
public void run() {
|
||||||
// message added to db
|
// jdbc close or jms commit - while futureOrSequenceLong==null ordered
|
||||||
|
// work will remain pending on the Queue
|
||||||
message.getMessageId().setFutureOrSequenceLong(sequence);
|
message.getMessageId().setFutureOrSequenceLong(sequence);
|
||||||
message.getMessageId().setEntryLocator(sequence);
|
message.getMessageId().setEntryLocator(sequence);
|
||||||
}
|
}
|
||||||
|
@ -341,6 +341,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(this + " recoverNext lastRecovered:" + lastRecoveredSequenceId.get() + ", minPending:" + minPendingSequeunceId());
|
||||||
|
}
|
||||||
adapter.doRecoverNextMessages(c, destination, minPendingSequeunceId(), lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
|
adapter.doRecoverNextMessages(c, destination, minPendingSequeunceId(), lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
|
||||||
maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
|
maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
|
||||||
|
|
||||||
|
@ -376,7 +379,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
*/
|
*/
|
||||||
public void resetBatching() {
|
public void resetBatching() {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
|
LOG.trace(this + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
|
||||||
}
|
}
|
||||||
lastRecoveredSequenceId.set(-1);
|
lastRecoveredSequenceId.set(-1);
|
||||||
lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
|
lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
|
||||||
|
@ -394,7 +397,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
lastRecoveredPriority.set(Byte.MAX_VALUE -1);
|
lastRecoveredPriority.set(Byte.MAX_VALUE -1);
|
||||||
}
|
}
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
|
LOG.trace(this + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
|
||||||
+ ", priority: " + lastRecoveredPriority.get());
|
+ ", priority: " + lastRecoveredPriority.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -403,4 +406,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
public void setPrioritizedMessages(boolean prioritizedMessages) {
|
public void setPrioritizedMessages(boolean prioritizedMessages) {
|
||||||
super.setPrioritizedMessages(prioritizedMessages);
|
super.setPrioritizedMessages(prioritizedMessages);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return destination.getPhysicalName() + ",pendingSize:" + pendingAdditions.size();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,7 +104,6 @@ class CountDownFuture[T <: AnyRef]() extends ListenableFuture[T] {
|
||||||
var value:T = _
|
var value:T = _
|
||||||
var error:Throwable = _
|
var error:Throwable = _
|
||||||
var listener:Runnable = _
|
var listener:Runnable = _
|
||||||
var id:MessageId = _
|
|
||||||
|
|
||||||
def cancel(mayInterruptIfRunning: Boolean) = false
|
def cancel(mayInterruptIfRunning: Boolean) = false
|
||||||
def isCancelled = false
|
def isCancelled = false
|
||||||
|
@ -116,9 +115,6 @@ class CountDownFuture[T <: AnyRef]() extends ListenableFuture[T] {
|
||||||
|
|
||||||
def set(v:T) = {
|
def set(v:T) = {
|
||||||
value = v
|
value = v
|
||||||
if (id != null) {
|
|
||||||
id.setFutureOrSequenceLong(id.getEntryLocator.asInstanceOf[EntryLocator].seq)
|
|
||||||
}
|
|
||||||
latch.countDown()
|
latch.countDown()
|
||||||
fireListener
|
fireListener
|
||||||
}
|
}
|
||||||
|
@ -330,14 +326,6 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
|
||||||
val entry = QueueEntryRecord(id, queueKey, queueSeq)
|
val entry = QueueEntryRecord(id, queueKey, queueSeq)
|
||||||
assert(id.getEntryLocator == null)
|
assert(id.getEntryLocator == null)
|
||||||
id.setEntryLocator(EntryLocator(queueKey, queueSeq))
|
id.setEntryLocator(EntryLocator(queueKey, queueSeq))
|
||||||
if (message.getTransactionId!=null) {
|
|
||||||
// why does future not get set in tx?
|
|
||||||
id.setFutureOrSequenceLong(queueSeq)
|
|
||||||
} else {
|
|
||||||
id.setFutureOrSequenceLong(countDownFuture)
|
|
||||||
message.setRecievedByDFBridge(true)
|
|
||||||
countDownFuture.id = id
|
|
||||||
}
|
|
||||||
|
|
||||||
val a = this.synchronized {
|
val a = this.synchronized {
|
||||||
if( !delay )
|
if( !delay )
|
||||||
|
@ -741,10 +729,10 @@ class DBManager(val parent:LevelDBStore) {
|
||||||
client.collectionIsEmpty(key)
|
client.collectionIsEmpty(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
def cursorMessages(preparedAcks:java.util.HashSet[MessageId], key:Long, listener:MessageRecoveryListener, startPos:Long, max:Long=Long.MaxValue) = {
|
def cursorMessages(preparedAcks:java.util.HashSet[MessageId], key:Long, listener:MessageRecoveryListener, startPos:Long, endPos:Long=Long.MaxValue, max:Long=Long.MaxValue) = {
|
||||||
var lastmsgid:MessageId = null
|
var lastmsgid:MessageId = null
|
||||||
var count = 0L
|
var count = 0L
|
||||||
client.queueCursor(key, startPos) { msg =>
|
client.queueCursor(key, startPos, endPos) { msg =>
|
||||||
if( !preparedAcks.contains(msg.getMessageId) && listener.recoverMessage(msg) ) {
|
if( !preparedAcks.contains(msg.getMessageId) && listener.recoverMessage(msg) ) {
|
||||||
lastmsgid = msg.getMessageId
|
lastmsgid = msg.getMessageId
|
||||||
count += 1
|
count += 1
|
||||||
|
|
|
@ -1255,8 +1255,8 @@ class LevelDBClient(store: LevelDBStore) {
|
||||||
return rc
|
return rc
|
||||||
}
|
}
|
||||||
|
|
||||||
def queueCursor(collectionKey: Long, seq:Long)(func: (Message)=>Boolean) = {
|
def queueCursor(collectionKey: Long, seq:Long, endSeq:Long)(func: (Message)=>Boolean) = {
|
||||||
collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
|
collectionCursor(collectionKey, encodeLong(seq), encodeLong(endSeq)) { (key, value) =>
|
||||||
val seq = decodeLong(key)
|
val seq = decodeLong(key)
|
||||||
var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
|
var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
|
||||||
val msg = getMessage(locator)
|
val msg = getMessage(locator)
|
||||||
|
@ -1273,7 +1273,7 @@ class LevelDBClient(store: LevelDBStore) {
|
||||||
}
|
}
|
||||||
|
|
||||||
def transactionCursor(collectionKey: Long)(func: (AnyRef)=>Boolean) = {
|
def transactionCursor(collectionKey: Long)(func: (AnyRef)=>Boolean) = {
|
||||||
collectionCursor(collectionKey, encodeLong(0)) { (key, value) =>
|
collectionCursor(collectionKey, encodeLong(0), encodeLong(Long.MaxValue)) { (key, value) =>
|
||||||
val seq = decodeLong(key)
|
val seq = decodeLong(key)
|
||||||
if( value.getMeta != null ) {
|
if( value.getMeta != null ) {
|
||||||
|
|
||||||
|
@ -1336,12 +1336,12 @@ class LevelDBClient(store: LevelDBStore) {
|
||||||
store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[Message]
|
store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[Message]
|
||||||
}
|
}
|
||||||
|
|
||||||
def collectionCursor(collectionKey: Long, cursorPosition:Buffer)(func: (Buffer, EntryRecord.Buffer)=>Boolean) = {
|
def collectionCursor(collectionKey: Long, cursorPosition:Buffer, endCursorPosition:Buffer)(func: (Buffer, EntryRecord.Buffer)=>Boolean) = {
|
||||||
val ro = new ReadOptions
|
val ro = new ReadOptions
|
||||||
ro.fillCache(true)
|
ro.fillCache(true)
|
||||||
ro.verifyChecksums(verifyChecksums)
|
ro.verifyChecksums(verifyChecksums)
|
||||||
val start = encodeEntryKey(ENTRY_PREFIX, collectionKey, cursorPosition)
|
val start = encodeEntryKey(ENTRY_PREFIX, collectionKey, cursorPosition)
|
||||||
val end = encodeLongKey(ENTRY_PREFIX, collectionKey+1)
|
val end = encodeEntryKey(ENTRY_PREFIX, collectionKey, endCursorPosition)
|
||||||
might_fail_using_index {
|
might_fail_using_index {
|
||||||
index.cursorRange(start, end, ro) { case (key, value) =>
|
index.cursorRange(start, end, ro) { case (key, value) =>
|
||||||
func(key.buffer.moveHead(9), EntryRecord.FACTORY.parseUnframed(value))
|
func(key.buffer.moveHead(9), EntryRecord.FACTORY.parseUnframed(value))
|
||||||
|
|
|
@ -35,7 +35,8 @@ import org.apache.activemq.leveldb.util.Log
|
||||||
import org.apache.activemq.store.PList.PListIterator
|
import org.apache.activemq.store.PList.PListIterator
|
||||||
import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream}
|
import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream}
|
||||||
import org.fusesource.hawtdispatch;
|
import org.fusesource.hawtdispatch;
|
||||||
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
|
import org.apache.activemq.broker.scheduler.JobSchedulerStore
|
||||||
|
import org.apache.activemq.store.IndexListener.MessageContext
|
||||||
|
|
||||||
object LevelDBStore extends Log {
|
object LevelDBStore extends Log {
|
||||||
val DEFAULT_DIRECTORY = new File("LevelDB");
|
val DEFAULT_DIRECTORY = new File("LevelDB");
|
||||||
|
@ -245,7 +246,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
||||||
val (msgs, acks) = db.getXAActions(transaction.xacontainer_id)
|
val (msgs, acks) = db.getXAActions(transaction.xacontainer_id)
|
||||||
transaction.xarecovery = (msgs, acks.map(_.ack))
|
transaction.xarecovery = (msgs, acks.map(_.ack))
|
||||||
for ( msg <- msgs ) {
|
for ( msg <- msgs ) {
|
||||||
transaction.add(createMessageStore(msg.getDestination), new IndexListener.MessageContext(null, msg, null), false);
|
transaction.add(createMessageStore(msg.getDestination), null, msg, false);
|
||||||
}
|
}
|
||||||
for ( record <- acks ) {
|
for ( record <- acks ) {
|
||||||
var ack = record.ack
|
var ack = record.ack
|
||||||
|
@ -348,27 +349,27 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def add(store:LevelDBStore#LevelDBMessageStore, messageContext:IndexListener.MessageContext, delay:Boolean) = {
|
def add(store:LevelDBStore#LevelDBMessageStore, context: ConnectionContext, message: Message, delay:Boolean) = {
|
||||||
commitActions += new TransactionAction() {
|
commitActions += new TransactionAction() {
|
||||||
def commit(uow:DelayableUOW) = {
|
def commit(uow:DelayableUOW) = {
|
||||||
if( prepared ) {
|
if( prepared ) {
|
||||||
uow.dequeue(xacontainer_id, messageContext.message.getMessageId)
|
uow.dequeue(xacontainer_id, message.getMessageId)
|
||||||
}
|
}
|
||||||
var copy = messageContext.message.getMessageId.copy()
|
var copy = message.getMessageId.copy()
|
||||||
copy.setEntryLocator(null)
|
copy.setEntryLocator(null)
|
||||||
messageContext.message.setMessageId(copy)
|
message.setMessageId(copy)
|
||||||
store.doAdd(uow, messageContext, delay)
|
store.doAdd(uow, context, message, delay)
|
||||||
}
|
}
|
||||||
|
|
||||||
def prepare(uow:DelayableUOW) = {
|
def prepare(uow:DelayableUOW) = {
|
||||||
// add it to the xa container instead of the actual store container.
|
// add it to the xa container instead of the actual store container.
|
||||||
uow.enqueue(xacontainer_id, xaseqcounter.incrementAndGet, messageContext.message, delay)
|
uow.enqueue(xacontainer_id, xaseqcounter.incrementAndGet, message, delay)
|
||||||
xarecovery._1 += messageContext.message
|
xarecovery._1 += message
|
||||||
}
|
}
|
||||||
|
|
||||||
def rollback(uow:DelayableUOW) = {
|
def rollback(uow:DelayableUOW) = {
|
||||||
if( prepared ) {
|
if( prepared ) {
|
||||||
uow.dequeue(xacontainer_id, messageContext.message.getMessageId)
|
uow.dequeue(xacontainer_id, message.getMessageId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -671,23 +672,29 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
||||||
val lastSeq: AtomicLong = new AtomicLong(0)
|
val lastSeq: AtomicLong = new AtomicLong(0)
|
||||||
protected var cursorPosition: Long = 0
|
protected var cursorPosition: Long = 0
|
||||||
val preparedAcks = new HashSet[MessageId]()
|
val preparedAcks = new HashSet[MessageId]()
|
||||||
|
val pendingCursorAdds = new LinkedList[Long]()
|
||||||
lastSeq.set(db.getLastQueueEntrySeq(key))
|
lastSeq.set(db.getLastQueueEntrySeq(key))
|
||||||
|
|
||||||
def cursorResetPosition = 0L
|
def cursorResetPosition = 0L
|
||||||
|
|
||||||
def doAdd(uow: DelayableUOW, messageContext:IndexListener.MessageContext, delay:Boolean): CountDownFuture[AnyRef] = {
|
def doAdd(uow: DelayableUOW, context: ConnectionContext, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
|
||||||
check_running
|
check_running
|
||||||
val seq = lastSeq.incrementAndGet()
|
message.incrementReferenceCount()
|
||||||
messageContext.message.incrementReferenceCount()
|
|
||||||
uow.addCompleteListener({
|
uow.addCompleteListener({
|
||||||
messageContext.message.decrementReferenceCount()
|
message.decrementReferenceCount()
|
||||||
})
|
})
|
||||||
val future = uow.enqueue(key, seq, messageContext.message, delay)
|
val sequence = lastSeq.synchronized {
|
||||||
|
val seq = lastSeq.incrementAndGet()
|
||||||
|
message.getMessageId.setFutureOrSequenceLong(seq);
|
||||||
if (indexListener != null) {
|
if (indexListener != null) {
|
||||||
indexListener.onAdd(messageContext)
|
pendingCursorAdds.synchronized { pendingCursorAdds.add(seq) }
|
||||||
|
indexListener.onAdd(new MessageContext(context, message, new Runnable {
|
||||||
|
def run(): Unit = pendingCursorAdds.synchronized { pendingCursorAdds.remove(seq) }
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
future
|
seq
|
||||||
|
}
|
||||||
|
uow.enqueue(key, sequence, message, delay)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false)
|
override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false)
|
||||||
|
@ -695,11 +702,11 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
||||||
check_running
|
check_running
|
||||||
message.getMessageId.setEntryLocator(null)
|
message.getMessageId.setEntryLocator(null)
|
||||||
if( message.getTransactionId!=null ) {
|
if( message.getTransactionId!=null ) {
|
||||||
transaction(message.getTransactionId).add(this, new IndexListener.MessageContext(context, message, null), delay)
|
transaction(message.getTransactionId).add(this, context, message, delay)
|
||||||
DONE
|
DONE
|
||||||
} else {
|
} else {
|
||||||
withUow { uow=>
|
withUow { uow=>
|
||||||
doAdd(uow, new IndexListener.MessageContext(context, message, null), delay)
|
doAdd(uow, context, message, delay)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -759,9 +766,13 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
||||||
return db.collectionIsEmpty(key)
|
return db.collectionIsEmpty(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def getCursorPendingLimit: Long = {
|
||||||
|
pendingCursorAdds.synchronized { Option(pendingCursorAdds.peek).getOrElse(Long.MaxValue) }
|
||||||
|
}
|
||||||
|
|
||||||
def recover(listener: MessageRecoveryListener): Unit = {
|
def recover(listener: MessageRecoveryListener): Unit = {
|
||||||
check_running
|
check_running
|
||||||
cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorResetPosition)
|
cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorResetPosition, getCursorPendingLimit)
|
||||||
}
|
}
|
||||||
|
|
||||||
def resetBatching: Unit = {
|
def resetBatching: Unit = {
|
||||||
|
@ -770,11 +781,11 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
||||||
|
|
||||||
def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
|
def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
|
||||||
check_running
|
check_running
|
||||||
cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorPosition, maxReturned)
|
cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorPosition, getCursorPendingLimit, maxReturned)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def setBatch(id: MessageId): Unit = {
|
override def setBatch(id: MessageId): Unit = {
|
||||||
cursorPosition = db.queuePosition(id) + 1
|
cursorPosition = Math.min(getCursorPendingLimit, db.queuePosition(id)) + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,7 +89,9 @@ public class AMQ5266SingleDestTest {
|
||||||
@Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
|
@Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
|
||||||
public static Iterable<Object[]> parameters() {
|
public static Iterable<Object[]> parameters() {
|
||||||
return Arrays.asList(new Object[][]{
|
return Arrays.asList(new Object[][]{
|
||||||
{1000, 80, 80, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.KahaDB, false},
|
{1000, 40, 40, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.KahaDB, false},
|
||||||
|
{1000, 40, 40, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.LevelDB, false},
|
||||||
|
{1000, 40, 40, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.JDBC, false},
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,7 @@ import javax.jms.TextMessage;
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.RedeliveryPolicy;
|
import org.apache.activemq.RedeliveryPolicy;
|
||||||
|
import org.apache.activemq.TestSupport;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.TransportConnector;
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
import org.apache.activemq.broker.region.RegionBroker;
|
import org.apache.activemq.broker.region.RegionBroker;
|
||||||
|
@ -66,7 +67,6 @@ public class AMQ5266StarvedConsumerTest {
|
||||||
static Logger LOG = LoggerFactory.getLogger(AMQ5266StarvedConsumerTest.class);
|
static Logger LOG = LoggerFactory.getLogger(AMQ5266StarvedConsumerTest.class);
|
||||||
String activemqURL;
|
String activemqURL;
|
||||||
BrokerService brokerService;
|
BrokerService brokerService;
|
||||||
private EmbeddedDataSource dataSource;
|
|
||||||
|
|
||||||
public int messageSize = 1000;
|
public int messageSize = 1000;
|
||||||
|
|
||||||
|
@ -86,16 +86,22 @@ public class AMQ5266StarvedConsumerTest {
|
||||||
public boolean useCache = true;
|
public boolean useCache = true;
|
||||||
|
|
||||||
@Parameterized.Parameter(5)
|
@Parameterized.Parameter(5)
|
||||||
public boolean useDefaultStore = false;
|
public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB;
|
||||||
|
|
||||||
@Parameterized.Parameter(6)
|
@Parameterized.Parameter(6)
|
||||||
public boolean optimizeDispatch = false;
|
public boolean optimizeDispatch = false;
|
||||||
private AtomicBoolean didNotReceive = new AtomicBoolean(false);
|
private AtomicBoolean didNotReceive = new AtomicBoolean(false);
|
||||||
|
|
||||||
@Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
|
@Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}")
|
||||||
public static Iterable<Object[]> parameters() {
|
public static Iterable<Object[]> parameters() {
|
||||||
return Arrays.asList(new Object[][]{
|
return Arrays.asList(new Object[][]{
|
||||||
{1000, 40, 5, 1024*1024, false, false, true},
|
{1000, 40, 5, 1024*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true},
|
||||||
|
{1000, 40, 5, 1024*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true},
|
||||||
|
{1000, 40, 5, 1024*1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true},
|
||||||
|
|
||||||
|
{500, 20, 20, 1024*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true},
|
||||||
|
{500, 20, 20, 1024*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true},
|
||||||
|
{500, 20, 20, 1024*1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true},
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,21 +110,7 @@ public class AMQ5266StarvedConsumerTest {
|
||||||
@Before
|
@Before
|
||||||
public void startBroker() throws Exception {
|
public void startBroker() throws Exception {
|
||||||
brokerService = new BrokerService();
|
brokerService = new BrokerService();
|
||||||
|
TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice);
|
||||||
dataSource = new EmbeddedDataSource();
|
|
||||||
dataSource.setDatabaseName("target/derbyDb");
|
|
||||||
dataSource.setCreateDatabase("create");
|
|
||||||
|
|
||||||
JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
|
|
||||||
jdbcPersistenceAdapter.setDataSource(dataSource);
|
|
||||||
jdbcPersistenceAdapter.setUseLock(false);
|
|
||||||
|
|
||||||
if (!useDefaultStore) {
|
|
||||||
brokerService.setPersistenceAdapter(jdbcPersistenceAdapter);
|
|
||||||
} else {
|
|
||||||
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
|
|
||||||
kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
|
|
||||||
}
|
|
||||||
brokerService.setDeleteAllMessagesOnStartup(true);
|
brokerService.setDeleteAllMessagesOnStartup(true);
|
||||||
brokerService.setUseJmx(false);
|
brokerService.setUseJmx(false);
|
||||||
brokerService.setAdvisorySupport(false);
|
brokerService.setAdvisorySupport(false);
|
||||||
|
@ -149,10 +141,6 @@ public class AMQ5266StarvedConsumerTest {
|
||||||
if (brokerService != null) {
|
if (brokerService != null) {
|
||||||
brokerService.stop();
|
brokerService.stop();
|
||||||
}
|
}
|
||||||
try {
|
|
||||||
dataSource.setShutdownDatabase("shutdown");
|
|
||||||
dataSource.getConnection();
|
|
||||||
} catch (Exception ignored) {}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
CyclicBarrier globalProducerHalt = new CyclicBarrier(publisherThreadCount, new Runnable() {
|
CyclicBarrier globalProducerHalt = new CyclicBarrier(publisherThreadCount, new Runnable() {
|
||||||
|
@ -216,9 +204,6 @@ public class AMQ5266StarvedConsumerTest {
|
||||||
try {
|
try {
|
||||||
int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
|
int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
|
||||||
LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
|
LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
|
||||||
if (!useDefaultStore) {
|
|
||||||
DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
|
|
||||||
}
|
|
||||||
Thread.sleep(10000);
|
Thread.sleep(10000);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
}
|
}
|
||||||
|
@ -228,12 +213,6 @@ public class AMQ5266StarvedConsumerTest {
|
||||||
|
|
||||||
consumer.shutdown();
|
consumer.shutdown();
|
||||||
|
|
||||||
TimeUnit.SECONDS.sleep(2);
|
|
||||||
LOG.info("DB Contents START");
|
|
||||||
if (!useDefaultStore) {
|
|
||||||
DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
|
|
||||||
}
|
|
||||||
LOG.info("DB Contents END");
|
|
||||||
|
|
||||||
LOG.info("Consumer Stats:");
|
LOG.info("Consumer Stats:");
|
||||||
|
|
||||||
|
|
|
@ -35,15 +35,12 @@ import javax.jms.TextMessage;
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.RedeliveryPolicy;
|
import org.apache.activemq.RedeliveryPolicy;
|
||||||
|
import org.apache.activemq.TestSupport;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.TransportConnector;
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
|
||||||
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
|
|
||||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
|
||||||
import org.apache.derby.jdbc.EmbeddedDataSource;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -65,7 +62,6 @@ public class AMQ5266Test {
|
||||||
static Logger LOG = LoggerFactory.getLogger(AMQ5266Test.class);
|
static Logger LOG = LoggerFactory.getLogger(AMQ5266Test.class);
|
||||||
String activemqURL = "tcp://localhost:61617";
|
String activemqURL = "tcp://localhost:61617";
|
||||||
BrokerService brokerService;
|
BrokerService brokerService;
|
||||||
private EmbeddedDataSource dataSource;
|
|
||||||
|
|
||||||
public int messageSize = 1000;
|
public int messageSize = 1000;
|
||||||
|
|
||||||
|
@ -85,28 +81,34 @@ public class AMQ5266Test {
|
||||||
public boolean useCache = true;
|
public boolean useCache = true;
|
||||||
|
|
||||||
@Parameterized.Parameter(5)
|
@Parameterized.Parameter(5)
|
||||||
public boolean useDefaultStore = false;
|
public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB;
|
||||||
|
|
||||||
@Parameterized.Parameter(6)
|
@Parameterized.Parameter(6)
|
||||||
public boolean optimizeDispatch = false;
|
public boolean optimizeDispatch = false;
|
||||||
|
|
||||||
@Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
|
@Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}")
|
||||||
public static Iterable<Object[]> parameters() {
|
public static Iterable<Object[]> parameters() {
|
||||||
return Arrays.asList(new Object[][]{
|
return Arrays.asList(new Object[][]{
|
||||||
// jdbc
|
{1, 1, 1, 50*1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true},
|
||||||
{1, 1, 1, 50*1024, false, false, true},
|
{1000, 20, 5, 50*1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false},
|
||||||
{1000, 20, 5, 50*1024, true, false, false},
|
{100, 20, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.JDBC, false},
|
||||||
{100, 20, 5, 50*1024, false, false, false},
|
{1000, 5, 20, 50*1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false},
|
||||||
{1000, 5, 20, 50*1024, true, false, false},
|
{1000, 20, 20, 1024*1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false},
|
||||||
{1000, 20, 20, 1024*1024, true, false, false},
|
|
||||||
|
{1, 1, 1, 50*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true},
|
||||||
|
{100, 5, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, false},
|
||||||
|
{1000, 20, 5, 50*1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false},
|
||||||
|
{100, 20, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, false},
|
||||||
|
{1000, 5, 20, 50*1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false},
|
||||||
|
{1000, 20, 20, 1024*1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false},
|
||||||
|
|
||||||
|
{1, 1, 1, 50*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true},
|
||||||
|
{100, 5, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, false},
|
||||||
|
{1000, 20, 5, 50*1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false},
|
||||||
|
{100, 20, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, false},
|
||||||
|
{1000, 5, 20, 50*1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false},
|
||||||
|
{1000, 20, 20, 1024*1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false},
|
||||||
|
|
||||||
// default store
|
|
||||||
{1, 1, 1, 50*1024, false, true, true},
|
|
||||||
{100, 5, 5, 50*1024, false, true, false},
|
|
||||||
{1000, 20, 5, 50*1024, true, true, false},
|
|
||||||
{100, 20, 5, 50*1024, false, true, false},
|
|
||||||
{1000, 5, 20, 50*1024, true, true, false},
|
|
||||||
{1000, 20, 20, 1024*1024, true, true, false},
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,25 +117,10 @@ public class AMQ5266Test {
|
||||||
@Before
|
@Before
|
||||||
public void startBroker() throws Exception {
|
public void startBroker() throws Exception {
|
||||||
brokerService = new BrokerService();
|
brokerService = new BrokerService();
|
||||||
|
TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice);
|
||||||
dataSource = new EmbeddedDataSource();
|
|
||||||
dataSource.setDatabaseName("target/derbyDb");
|
|
||||||
dataSource.setCreateDatabase("create");
|
|
||||||
|
|
||||||
JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
|
|
||||||
jdbcPersistenceAdapter.setDataSource(dataSource);
|
|
||||||
jdbcPersistenceAdapter.setUseLock(false);
|
|
||||||
|
|
||||||
if (!useDefaultStore) {
|
|
||||||
brokerService.setPersistenceAdapter(jdbcPersistenceAdapter);
|
|
||||||
} else {
|
|
||||||
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
|
|
||||||
kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
|
|
||||||
}
|
|
||||||
brokerService.setDeleteAllMessagesOnStartup(true);
|
brokerService.setDeleteAllMessagesOnStartup(true);
|
||||||
brokerService.setUseJmx(false);
|
brokerService.setUseJmx(false);
|
||||||
|
|
||||||
|
|
||||||
PolicyMap policyMap = new PolicyMap();
|
PolicyMap policyMap = new PolicyMap();
|
||||||
PolicyEntry defaultEntry = new PolicyEntry();
|
PolicyEntry defaultEntry = new PolicyEntry();
|
||||||
defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract!
|
defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract!
|
||||||
|
@ -159,10 +146,6 @@ public class AMQ5266Test {
|
||||||
if (brokerService != null) {
|
if (brokerService != null) {
|
||||||
brokerService.stop();
|
brokerService.stop();
|
||||||
}
|
}
|
||||||
try {
|
|
||||||
dataSource.setShutdownDatabase("shutdown");
|
|
||||||
dataSource.getConnection();
|
|
||||||
} catch (Exception ignored) {}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -211,9 +194,6 @@ public class AMQ5266Test {
|
||||||
try {
|
try {
|
||||||
int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
|
int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
|
||||||
LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
|
LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
|
||||||
if (!useDefaultStore) {
|
|
||||||
DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
|
|
||||||
}
|
|
||||||
Thread.sleep(10000);
|
Thread.sleep(10000);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
}
|
}
|
||||||
|
@ -223,13 +203,6 @@ public class AMQ5266Test {
|
||||||
|
|
||||||
consumer.shutdown();
|
consumer.shutdown();
|
||||||
|
|
||||||
TimeUnit.SECONDS.sleep(2);
|
|
||||||
LOG.info("DB Contents START");
|
|
||||||
if (!useDefaultStore) {
|
|
||||||
DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
|
|
||||||
}
|
|
||||||
LOG.info("DB Contents END");
|
|
||||||
|
|
||||||
LOG.info("Consumer Stats:");
|
LOG.info("Consumer Stats:");
|
||||||
|
|
||||||
for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) {
|
for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) {
|
||||||
|
|
Loading…
Reference in New Issue