mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-5266 https://issues.apache.org/jira/browse/AMQ-4485 - store has messages must be aware of pending also kahadb setBatch for async sends. additional tests and tidy up of cusror sync with store to reflect async/sync additions
This commit is contained in:
parent
243db1c289
commit
9c2b1d2572
|
@ -771,19 +771,24 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
|||
candidate = indexOrderedCursorUpdates.peek();
|
||||
}
|
||||
}
|
||||
for (MessageContext messageContext : orderedUpdates) {
|
||||
if (!cursorAdd(messageContext.message)) {
|
||||
// cursor suppressed a duplicate
|
||||
messageContext.duplicate = true;
|
||||
messagesLock.writeLock().lock();
|
||||
try {
|
||||
for (MessageContext messageContext : orderedUpdates) {
|
||||
if (!messages.addMessageLast(messageContext.message)) {
|
||||
// cursor suppressed a duplicate
|
||||
messageContext.duplicate = true;
|
||||
}
|
||||
if (messageContext.onCompletion != null) {
|
||||
messageContext.onCompletion.run();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
messagesLock.writeLock().unlock();
|
||||
}
|
||||
} finally {
|
||||
sendLock.unlock();
|
||||
}
|
||||
for (MessageContext messageContext : orderedUpdates) {
|
||||
if (messageContext.onCompletion != null) {
|
||||
messageContext.onCompletion.run();
|
||||
}
|
||||
if (!messageContext.duplicate) {
|
||||
messageSent(messageContext.context, messageContext.message);
|
||||
}
|
||||
|
|
|
@ -20,6 +20,8 @@ import java.util.Iterator;
|
|||
import java.util.LinkedList;
|
||||
import java.util.ListIterator;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.MessageReference;
|
||||
import org.apache.activemq.broker.region.Subscription;
|
||||
|
@ -90,6 +92,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
|
||||
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
|
||||
boolean recovered = false;
|
||||
storeHasMessages = true;
|
||||
if (recordUniqueId(message.getMessageId())) {
|
||||
if (!cached) {
|
||||
message.setRegionDestination(regionDestination);
|
||||
|
@ -101,12 +104,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
batchList.addMessageLast(message);
|
||||
clearIterator(true);
|
||||
recovered = true;
|
||||
storeHasMessages = true;
|
||||
} else if (!cached) {
|
||||
// a duplicate from the store (!cached) - needs to be removed/acked - otherwise it will get re dispatched on restart
|
||||
if (message.isRecievedByDFBridge()) {
|
||||
// expected for messages pending acks with kahadb.concurrentStoreAndDispatchQueues=true
|
||||
LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
|
||||
}
|
||||
} else {
|
||||
LOG.warn("{} - cursor got duplicate from store {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
|
||||
duplicate(message);
|
||||
|
@ -201,7 +205,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
boolean disableCache = false;
|
||||
if (hasSpace()) {
|
||||
if (!isCacheEnabled() && size==0 && isStarted() && useCache) {
|
||||
LOG.trace("{} - enabling cache for empty store {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("{} - enabling cache for empty store {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
|
||||
}
|
||||
setCacheEnabled(true);
|
||||
}
|
||||
if (isCacheEnabled()) {
|
||||
|
@ -217,64 +223,48 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
}
|
||||
|
||||
if (disableCache && isCacheEnabled()) {
|
||||
LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
|
||||
}
|
||||
syncWithStore(node.getMessage());
|
||||
setCacheEnabled(false);
|
||||
syncWithStore();
|
||||
}
|
||||
this.storeHasMessages = true;
|
||||
size++;
|
||||
return true;
|
||||
}
|
||||
|
||||
private void syncWithStore() throws Exception {
|
||||
private void syncWithStore(Message currentAdd) throws Exception {
|
||||
pruneLastCached();
|
||||
if (lastCachedIds[SYNC_ADD] == null) {
|
||||
// only async adds, lets wait on the potential last add and reset from there
|
||||
// possibly only async adds, lets wait on the potential last add and reset from there
|
||||
for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) {
|
||||
MessageId lastStored = it.previous();
|
||||
Object futureOrLong = lastStored.getFutureOrSequenceLong();
|
||||
MessageId lastPending = it.previous();
|
||||
Object futureOrLong = lastPending.getFutureOrSequenceLong();
|
||||
if (futureOrLong instanceof Future) {
|
||||
Future future = (Future) futureOrLong;
|
||||
if (future.isCancelled()) {
|
||||
continue;
|
||||
} else {
|
||||
try {
|
||||
future.get();
|
||||
setLastCachedId(ASYNC_ADD, lastStored);
|
||||
} catch (Exception ignored) {}
|
||||
}
|
||||
try {
|
||||
future.get(5, TimeUnit.SECONDS);
|
||||
setLastCachedId(ASYNC_ADD, lastPending);
|
||||
} catch (TimeoutException potentialDeadlock) {
|
||||
LOG.warn("{} timed out waiting for async add", this, potentialDeadlock);
|
||||
} catch (Exception cancelledOrTimeOutOrErrorWorstCaseWeReplay) {cancelledOrTimeOutOrErrorWorstCaseWeReplay.printStackTrace();}
|
||||
} else {
|
||||
setLastCachedId(ASYNC_ADD, lastPending);
|
||||
}
|
||||
break;
|
||||
}
|
||||
if (lastCachedIds[ASYNC_ADD] != null) {
|
||||
setBatch(lastCachedIds[ASYNC_ADD]);
|
||||
}
|
||||
} else {
|
||||
// mix of async and sync - async can exceed sync only if next in sequence
|
||||
for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) {
|
||||
MessageId candidate = it.next();
|
||||
final Object futureOrLong = candidate.getFutureOrSequenceLong();
|
||||
if (futureOrLong instanceof Future) {
|
||||
Future future = (Future) futureOrLong;
|
||||
if (future.isCancelled()) {
|
||||
it.remove();
|
||||
} else {
|
||||
try {
|
||||
future.get();
|
||||
long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong();
|
||||
if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), next) == 0) {
|
||||
setLastCachedId(SYNC_ADD, candidate);
|
||||
} else {
|
||||
// out of sequence, revert to sync state
|
||||
LOG.trace("{} cursor order out of sync at seq {}, audit must suppress potential replay of {} messages from the store", this, next, pendingCachedIds.size());
|
||||
break;
|
||||
}
|
||||
} catch (Exception ignored) {}
|
||||
}
|
||||
// ensure we don't skip current possibly sync add b/c we waited on the future
|
||||
if (currentAdd.isRecievedByDFBridge() || Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) > 0) {
|
||||
setBatch(lastCachedIds[ASYNC_ADD]);
|
||||
}
|
||||
}
|
||||
if (lastCachedIds[SYNC_ADD] != null) {
|
||||
setBatch(lastCachedIds[SYNC_ADD]);
|
||||
}
|
||||
|
||||
} else {
|
||||
setBatch(lastCachedIds[SYNC_ADD]);
|
||||
}
|
||||
// cleanup
|
||||
lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null;
|
||||
|
@ -282,7 +272,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
}
|
||||
|
||||
private void trackLastCached(MessageReference node) {
|
||||
if (node.getMessageId().getFutureOrSequenceLong() instanceof Future) {
|
||||
if (node.getMessageId().getFutureOrSequenceLong() instanceof Future || node.getMessage().isRecievedByDFBridge()) {
|
||||
pruneLastCached();
|
||||
pendingCachedIds.add(node.getMessageId());
|
||||
} else {
|
||||
|
@ -305,6 +295,19 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
} else {
|
||||
// complete
|
||||
setLastCachedId(ASYNC_ADD, candidate);
|
||||
|
||||
// keep lock step with sync adds while order is preserved
|
||||
if (lastCachedIds[SYNC_ADD] != null) {
|
||||
long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong();
|
||||
if (Long.compare((Long)futureOrLong, next) == 0) {
|
||||
setLastCachedId(SYNC_ADD, candidate);
|
||||
} else {
|
||||
// out of sequence, revert to sync state
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("{} cursor order out of sync at seq {}, audit must suppress potential replay of {} messages from the store", this, next, pendingCachedIds.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
|
@ -374,13 +377,17 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
this.batchResetNeeded = false;
|
||||
}
|
||||
if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) {
|
||||
// avoid repeated trips to the store if there is nothing of interest
|
||||
this.storeHasMessages = false;
|
||||
try {
|
||||
doFillBatch();
|
||||
} catch (Exception e) {
|
||||
LOG.error("{} - Failed to fill batch", this, e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
this.storeHasMessages = !this.batchList.isEmpty() || !hadSpace;
|
||||
if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) {
|
||||
this.storeHasMessages = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -94,7 +94,9 @@ class QueueStorePrefetch extends AbstractStoreCursor {
|
|||
|
||||
@Override
|
||||
protected void setBatch(MessageId messageId) throws Exception {
|
||||
LOG.trace("{} setBatch {} seq: {}, loc: {}", this, messageId, messageId.getFutureOrSequenceLong(), messageId.getEntryLocator());
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("{} setBatch {} seq: {}, loc: {}", this, messageId, messageId.getFutureOrSequenceLong(), messageId.getEntryLocator());
|
||||
}
|
||||
store.setBatch(messageId);
|
||||
batchResetNeeded = false;
|
||||
}
|
||||
|
@ -109,4 +111,8 @@ class QueueStorePrefetch extends AbstractStoreCursor {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString(){
|
||||
return super.toString() + ",store=" + store;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -165,4 +165,9 @@ public class ProxyMessageStore implements MessageStore {
|
|||
public void registerIndexListener(IndexListener indexListener) {
|
||||
delegate.registerIndexListener(indexListener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return delegate.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -665,6 +665,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
StoredDestination sd = getStoredDestination(dest, tx);
|
||||
Long location = sd.messageIdIndex.get(tx, key);
|
||||
if (location != null) {
|
||||
Long pending = sd.orderIndex.minPendingAdd();
|
||||
if (pending != null) {
|
||||
location = Math.min(location, pending-1);
|
||||
}
|
||||
sd.orderIndex.setBatch(tx, location);
|
||||
} else {
|
||||
LOG.warn("{} {} setBatch failed, location for {} not found in messageId index for {}", this, dest.getName(), identity.getFutureOrSequenceLong(), identity);
|
||||
|
@ -714,6 +718,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
this.localDestinationSemaphore.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString(){
|
||||
return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
|
||||
}
|
||||
}
|
||||
|
||||
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
|
||||
|
|
|
@ -1767,7 +1767,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
// StoredDestination related implementation methods.
|
||||
// /////////////////////////////////////////////////////////////////
|
||||
|
||||
private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
|
||||
protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
|
||||
|
||||
static class MessageKeys {
|
||||
final String messageId;
|
||||
|
@ -1886,6 +1886,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
public void trackPendingAddComplete(Long seq) {
|
||||
orderIndex.trackPendingAddComplete(seq);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size();
|
||||
}
|
||||
}
|
||||
|
||||
protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
|
||||
|
@ -2337,7 +2342,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
return 0;
|
||||
}
|
||||
|
||||
private String key(KahaDestination destination) {
|
||||
protected String key(KahaDestination destination) {
|
||||
return destination.getType().getNumber() + ":" + destination.getName();
|
||||
}
|
||||
|
||||
|
|
|
@ -1033,6 +1033,7 @@
|
|||
<exclude>org/apache/activemq/store/kahadb/disk/index/HashIndexTest.*</exclude>
|
||||
<exclude>org/apache/activemq/store/kahadb/disk/index/ListIndexTest.*</exclude>
|
||||
<exclude>org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStreamTest.*</exclude>
|
||||
<exclude>org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.*</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
|
|
@ -0,0 +1,517 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.broker.region.cursors;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.region.DestinationStatistics;
|
||||
import org.apache.activemq.broker.region.MessageReference;
|
||||
import org.apache.activemq.broker.region.Queue;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.store.AbstractMessageStore;
|
||||
import org.apache.activemq.store.MessageRecoveryListener;
|
||||
import org.apache.activemq.usage.SystemUsage;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class StoreQueueCursorOrderTest {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursorOrderTest.class);
|
||||
|
||||
ActiveMQQueue destination = new ActiveMQQueue("queue-"
|
||||
+ StoreQueueCursorOrderTest.class.getSimpleName());
|
||||
BrokerService brokerService;
|
||||
|
||||
final static String mesageIdRoot = "11111:22222:0:";
|
||||
final int messageBytesSize = 1024;
|
||||
final String text = new String(new byte[messageBytesSize]);
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
brokerService = createBroker();
|
||||
brokerService.setUseJmx(false);
|
||||
brokerService.deleteAllMessages();
|
||||
brokerService.start();
|
||||
}
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
return new BrokerService();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
brokerService.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void tesBlockedFuture() throws Exception {
|
||||
final int count = 2;
|
||||
final Message[] messages = new Message[count];
|
||||
final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination);
|
||||
final ConsumerInfo consumerInfo = new ConsumerInfo();
|
||||
final DestinationStatistics destinationStatistics = new DestinationStatistics();
|
||||
consumerInfo.setExclusive(true);
|
||||
|
||||
final Queue queue = new Queue(brokerService, destination,
|
||||
queueMessageStore, destinationStatistics, null);
|
||||
|
||||
queueMessageStore.start();
|
||||
queueMessageStore.registerIndexListener(null);
|
||||
|
||||
QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
|
||||
SystemUsage systemUsage = new SystemUsage();
|
||||
// ensure memory limit is reached
|
||||
systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
|
||||
underTest.setSystemUsage(systemUsage);
|
||||
underTest.setEnableAudit(false);
|
||||
underTest.start();
|
||||
assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
|
||||
|
||||
ActiveMQTextMessage msg = getMessage(0);
|
||||
messages[1] = msg;
|
||||
msg.setMemoryUsage(systemUsage.getMemoryUsage());
|
||||
msg.setRecievedByDFBridge(true);
|
||||
FutureTask<Long> future = new FutureTask<Long>(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
}
|
||||
}, 2l) {};
|
||||
msg.getMessageId().setFutureOrSequenceLong(future);
|
||||
underTest.addMessageLast(msg);
|
||||
|
||||
assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
|
||||
|
||||
// second message will flip the cache but will be stored before the future task
|
||||
msg = getMessage(1);
|
||||
messages[0] = msg;
|
||||
msg.setMemoryUsage(systemUsage.getMemoryUsage());
|
||||
msg.getMessageId().setFutureOrSequenceLong(1l);
|
||||
underTest.addMessageLast(msg);
|
||||
|
||||
|
||||
assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
|
||||
assertEquals("setBatch unset", 0l, queueMessageStore.batch.get());
|
||||
|
||||
int dequeueCount = 0;
|
||||
|
||||
underTest.setMaxBatchSize(2);
|
||||
underTest.reset();
|
||||
while (underTest.hasNext() && dequeueCount < count) {
|
||||
MessageReference ref = underTest.next();
|
||||
ref.decrementReferenceCount();
|
||||
underTest.remove();
|
||||
LOG.info("Received message: {} with body: {}",
|
||||
ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
|
||||
assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
|
||||
}
|
||||
underTest.release();
|
||||
assertEquals(count, dequeueCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoSetBatchWithUnOrderedFutureCurrentSync() throws Exception {
|
||||
final int count = 2;
|
||||
final Message[] messages = new Message[count];
|
||||
final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination);
|
||||
final ConsumerInfo consumerInfo = new ConsumerInfo();
|
||||
final DestinationStatistics destinationStatistics = new DestinationStatistics();
|
||||
consumerInfo.setExclusive(true);
|
||||
|
||||
final Queue queue = new Queue(brokerService, destination,
|
||||
queueMessageStore, destinationStatistics, null);
|
||||
|
||||
queueMessageStore.start();
|
||||
queueMessageStore.registerIndexListener(null);
|
||||
|
||||
QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
|
||||
SystemUsage systemUsage = new SystemUsage();
|
||||
// ensure memory limit is reached
|
||||
systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
|
||||
underTest.setSystemUsage(systemUsage);
|
||||
underTest.setEnableAudit(false);
|
||||
underTest.start();
|
||||
assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
|
||||
|
||||
ActiveMQTextMessage msg = getMessage(0);
|
||||
messages[1] = msg;
|
||||
msg.setMemoryUsage(systemUsage.getMemoryUsage());
|
||||
msg.setRecievedByDFBridge(true);
|
||||
final ActiveMQTextMessage msgRef = msg;
|
||||
FutureTask<Long> future = new FutureTask<Long>(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
msgRef.getMessageId().setFutureOrSequenceLong(1l);
|
||||
}
|
||||
}, 1l) {};
|
||||
msg.getMessageId().setFutureOrSequenceLong(future);
|
||||
Executors.newSingleThreadExecutor().submit(future);
|
||||
underTest.addMessageLast(msg);
|
||||
|
||||
assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
|
||||
|
||||
// second message will flip the cache but will be stored before the future task
|
||||
msg = getMessage(1);
|
||||
messages[0] = msg;
|
||||
msg.setMemoryUsage(systemUsage.getMemoryUsage());
|
||||
msg.getMessageId().setFutureOrSequenceLong(1l);
|
||||
underTest.addMessageLast(msg);
|
||||
|
||||
|
||||
assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
|
||||
assertEquals("setBatch unset", 0l, queueMessageStore.batch.get());
|
||||
|
||||
int dequeueCount = 0;
|
||||
|
||||
underTest.setMaxBatchSize(2);
|
||||
underTest.reset();
|
||||
while (underTest.hasNext() && dequeueCount < count) {
|
||||
MessageReference ref = underTest.next();
|
||||
ref.decrementReferenceCount();
|
||||
underTest.remove();
|
||||
LOG.info("Received message: {} with body: {}",
|
||||
ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
|
||||
assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
|
||||
}
|
||||
underTest.release();
|
||||
assertEquals(count, dequeueCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetBatchWithOrderedFutureCurrentFuture() throws Exception {
|
||||
final int count = 2;
|
||||
final Message[] messages = new Message[count];
|
||||
final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination);
|
||||
final ConsumerInfo consumerInfo = new ConsumerInfo();
|
||||
final DestinationStatistics destinationStatistics = new DestinationStatistics();
|
||||
consumerInfo.setExclusive(true);
|
||||
|
||||
final Queue queue = new Queue(brokerService, destination,
|
||||
queueMessageStore, destinationStatistics, null);
|
||||
|
||||
queueMessageStore.start();
|
||||
queueMessageStore.registerIndexListener(null);
|
||||
|
||||
QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
|
||||
SystemUsage systemUsage = new SystemUsage();
|
||||
// ensure memory limit is reached
|
||||
systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
|
||||
underTest.setSystemUsage(systemUsage);
|
||||
underTest.setEnableAudit(false);
|
||||
underTest.start();
|
||||
assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
|
||||
|
||||
ActiveMQTextMessage msg = getMessage(0);
|
||||
messages[0] = msg;
|
||||
msg.setMemoryUsage(systemUsage.getMemoryUsage());
|
||||
msg.setRecievedByDFBridge(true);
|
||||
final ActiveMQTextMessage msgRef = msg;
|
||||
FutureTask<Long> future = new FutureTask<Long>(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
msgRef.getMessageId().setFutureOrSequenceLong(0l);
|
||||
}
|
||||
}, 0l) {};
|
||||
msg.getMessageId().setFutureOrSequenceLong(future);
|
||||
Executors.newSingleThreadExecutor().submit(future);
|
||||
underTest.addMessageLast(msg);
|
||||
|
||||
assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
|
||||
|
||||
// second message will flip the cache but will be stored before the future task
|
||||
msg = getMessage(1);
|
||||
messages[1] = msg;
|
||||
msg.setMemoryUsage(systemUsage.getMemoryUsage());
|
||||
msg.setRecievedByDFBridge(true);
|
||||
final ActiveMQTextMessage msgRe2f = msg;
|
||||
FutureTask<Long> future2 = new FutureTask<Long>(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
msgRe2f.getMessageId().setFutureOrSequenceLong(1l);
|
||||
}
|
||||
}, 1l) {};
|
||||
msg.getMessageId().setFutureOrSequenceLong(future2);
|
||||
Executors.newSingleThreadExecutor().submit(future2);
|
||||
underTest.addMessageLast(msg);
|
||||
|
||||
|
||||
assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
|
||||
assertEquals("setBatch set", 1l, queueMessageStore.batch.get());
|
||||
|
||||
int dequeueCount = 0;
|
||||
|
||||
underTest.setMaxBatchSize(2);
|
||||
underTest.reset();
|
||||
while (underTest.hasNext() && dequeueCount < count) {
|
||||
MessageReference ref = underTest.next();
|
||||
ref.decrementReferenceCount();
|
||||
underTest.remove();
|
||||
LOG.info("Received message: {} with body: {}",
|
||||
ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
|
||||
assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
|
||||
}
|
||||
underTest.release();
|
||||
assertEquals(count, dequeueCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetBatchWithFuture() throws Exception {
|
||||
final int count = 4;
|
||||
final Message[] messages = new Message[count];
|
||||
final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination);
|
||||
final ConsumerInfo consumerInfo = new ConsumerInfo();
|
||||
final DestinationStatistics destinationStatistics = new DestinationStatistics();
|
||||
consumerInfo.setExclusive(true);
|
||||
|
||||
final Queue queue = new Queue(brokerService, destination,
|
||||
queueMessageStore, destinationStatistics, null);
|
||||
|
||||
queueMessageStore.start();
|
||||
queueMessageStore.registerIndexListener(null);
|
||||
|
||||
QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
|
||||
SystemUsage systemUsage = new SystemUsage();
|
||||
// ensure memory limit is reached
|
||||
systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 6));
|
||||
underTest.setSystemUsage(systemUsage);
|
||||
underTest.setEnableAudit(false);
|
||||
underTest.start();
|
||||
assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
|
||||
|
||||
ActiveMQTextMessage msg = getMessage(0);
|
||||
messages[0] = msg;
|
||||
msg.setMemoryUsage(systemUsage.getMemoryUsage());
|
||||
msg.setRecievedByDFBridge(true);
|
||||
final ActiveMQTextMessage msgRef = msg;
|
||||
FutureTask<Long> future0 = new FutureTask<Long>(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
msgRef.getMessageId().setFutureOrSequenceLong(0l);
|
||||
}
|
||||
}, 0l) {};
|
||||
msg.getMessageId().setFutureOrSequenceLong(future0);
|
||||
underTest.addMessageLast(msg);
|
||||
Executors.newSingleThreadExecutor().submit(future0);
|
||||
|
||||
|
||||
msg = getMessage(1);
|
||||
messages[3] = msg;
|
||||
msg.setMemoryUsage(systemUsage.getMemoryUsage());
|
||||
msg.setRecievedByDFBridge(true);
|
||||
final ActiveMQTextMessage msgRef1 = msg;
|
||||
FutureTask<Long> future1 = new FutureTask<Long>(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
msgRef1.getMessageId().setFutureOrSequenceLong(3l);
|
||||
}
|
||||
}, 3l) {};
|
||||
msg.getMessageId().setFutureOrSequenceLong(future1);
|
||||
underTest.addMessageLast(msg);
|
||||
|
||||
|
||||
msg = getMessage(2);
|
||||
messages[1] = msg;
|
||||
msg.setMemoryUsage(systemUsage.getMemoryUsage());
|
||||
msg.getMessageId().setFutureOrSequenceLong(1l);
|
||||
underTest.addMessageLast(msg);
|
||||
|
||||
assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
|
||||
|
||||
// out of order future
|
||||
Executors.newSingleThreadExecutor().submit(future1);
|
||||
|
||||
// sync add to flip cache
|
||||
msg = getMessage(3);
|
||||
messages[2] = msg;
|
||||
msg.setMemoryUsage(systemUsage.getMemoryUsage());
|
||||
msg.getMessageId().setFutureOrSequenceLong(3l);
|
||||
underTest.addMessageLast(msg);
|
||||
|
||||
|
||||
assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
|
||||
assertEquals("setBatch set", 2l, queueMessageStore.batch.get());
|
||||
|
||||
int dequeueCount = 0;
|
||||
|
||||
underTest.setMaxBatchSize(count);
|
||||
underTest.reset();
|
||||
while (underTest.hasNext() && dequeueCount < count) {
|
||||
MessageReference ref = underTest.next();
|
||||
ref.decrementReferenceCount();
|
||||
underTest.remove();
|
||||
LOG.info("Received message: {} with body: {}",
|
||||
ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
|
||||
assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
|
||||
}
|
||||
underTest.release();
|
||||
assertEquals(count, dequeueCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetBatch() throws Exception {
|
||||
final int count = 3;
|
||||
final Message[] messages = new Message[count];
|
||||
final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination);
|
||||
final ConsumerInfo consumerInfo = new ConsumerInfo();
|
||||
final DestinationStatistics destinationStatistics = new DestinationStatistics();
|
||||
consumerInfo.setExclusive(true);
|
||||
|
||||
final Queue queue = new Queue(brokerService, destination,
|
||||
queueMessageStore, destinationStatistics, null);
|
||||
|
||||
queueMessageStore.start();
|
||||
queueMessageStore.registerIndexListener(null);
|
||||
|
||||
QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
|
||||
SystemUsage systemUsage = new SystemUsage();
|
||||
// ensure memory limit is reached
|
||||
systemUsage.getMemoryUsage().setLimit(messageBytesSize * 5);
|
||||
underTest.setSystemUsage(systemUsage);
|
||||
underTest.setEnableAudit(false);
|
||||
underTest.start();
|
||||
assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
|
||||
|
||||
|
||||
ActiveMQTextMessage msg = getMessage(0);
|
||||
messages[0] = msg;
|
||||
msg.setMemoryUsage(systemUsage.getMemoryUsage());
|
||||
msg.getMessageId().setFutureOrSequenceLong(0l);
|
||||
underTest.addMessageLast(msg);
|
||||
|
||||
msg = getMessage(1);
|
||||
messages[1] = msg;
|
||||
msg.setMemoryUsage(systemUsage.getMemoryUsage());
|
||||
msg.getMessageId().setFutureOrSequenceLong(1l);
|
||||
underTest.addMessageLast(msg);
|
||||
|
||||
assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled());
|
||||
|
||||
msg = getMessage(2);
|
||||
messages[2] = msg;
|
||||
msg.setMemoryUsage(systemUsage.getMemoryUsage());
|
||||
msg.getMessageId().setFutureOrSequenceLong(2l);
|
||||
underTest.addMessageLast(msg);
|
||||
|
||||
|
||||
assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled());
|
||||
assertEquals("setBatch set", 2l, queueMessageStore.batch.get());
|
||||
|
||||
int dequeueCount = 0;
|
||||
|
||||
underTest.setMaxBatchSize(2);
|
||||
underTest.reset();
|
||||
while (underTest.hasNext() && dequeueCount < count) {
|
||||
MessageReference ref = underTest.next();
|
||||
ref.decrementReferenceCount();
|
||||
underTest.remove();
|
||||
LOG.info("Received message: {} with body: {}",
|
||||
ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
|
||||
assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
|
||||
}
|
||||
underTest.release();
|
||||
assertEquals(count, dequeueCount);
|
||||
}
|
||||
|
||||
private ActiveMQTextMessage getMessage(int i) throws Exception {
|
||||
ActiveMQTextMessage message = new ActiveMQTextMessage();
|
||||
MessageId id = new MessageId(mesageIdRoot + i);
|
||||
id.setBrokerSequenceId(i);
|
||||
id.setProducerSequenceId(i);
|
||||
message.setMessageId(id);
|
||||
message.setDestination(destination);
|
||||
message.setPersistent(true);
|
||||
message.setResponseRequired(true);
|
||||
message.setText("Msg:" + i + " " + text);
|
||||
assertEquals(message.getMessageId().getProducerSequenceId(), i);
|
||||
return message;
|
||||
}
|
||||
|
||||
class TestMessageStore extends AbstractMessageStore {
|
||||
final Message[] messages;
|
||||
public AtomicLong batch = new AtomicLong();
|
||||
|
||||
public TestMessageStore(Message[] messages, ActiveMQDestination dest) {
|
||||
super(dest);
|
||||
this.messages = messages;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addMessage(ConnectionContext context, Message message) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message getMessage(MessageId identity) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeAllMessages(ConnectionContext context) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recover(MessageRecoveryListener container) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMessageCount() throws IOException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetBatching() {
|
||||
|
||||
}
|
||||
@Override
|
||||
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
|
||||
for (int i=batch.intValue();i<messages.length;i++) {
|
||||
LOG.info("recovered index:" + i);
|
||||
listener.recoverMessage(messages[i]);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBatch(MessageId message) {
|
||||
batch.set((Long)message.getFutureOrSequenceLong());
|
||||
batch.incrementAndGet();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,641 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.bugs;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeSet;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.QueueConnection;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.RedeliveryPolicy;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
import org.apache.derby.jdbc.EmbeddedDataSource;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/*
|
||||
* pause producers if consumers stall and verify broker drained before resume
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class AMQ5266StarvedConsumerTest {
|
||||
static Logger LOG = LoggerFactory.getLogger(AMQ5266StarvedConsumerTest.class);
|
||||
String activemqURL;
|
||||
BrokerService brokerService;
|
||||
private EmbeddedDataSource dataSource;
|
||||
|
||||
public int messageSize = 1000;
|
||||
|
||||
@Parameterized.Parameter(0)
|
||||
public int publisherMessagesPerThread = 1000;
|
||||
|
||||
@Parameterized.Parameter(1)
|
||||
public int publisherThreadCount = 20;
|
||||
|
||||
@Parameterized.Parameter(2)
|
||||
public int consumerThreadsPerQueue = 5;
|
||||
|
||||
@Parameterized.Parameter(3)
|
||||
public int destMemoryLimit = 50 * 1024;
|
||||
|
||||
@Parameterized.Parameter(4)
|
||||
public boolean useCache = true;
|
||||
|
||||
@Parameterized.Parameter(5)
|
||||
public boolean useDefaultStore = false;
|
||||
|
||||
@Parameterized.Parameter(6)
|
||||
public boolean optimizeDispatch = false;
|
||||
private AtomicBoolean didNotReceive = new AtomicBoolean(false);
|
||||
|
||||
@Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
|
||||
public static Iterable<Object[]> parameters() {
|
||||
return Arrays.asList(new Object[][]{
|
||||
{1000, 40, 5, 1024*1024, false, false, true},
|
||||
});
|
||||
}
|
||||
|
||||
public int consumerBatchSize = 5;
|
||||
|
||||
@Before
|
||||
public void startBroker() throws Exception {
|
||||
brokerService = new BrokerService();
|
||||
|
||||
dataSource = new EmbeddedDataSource();
|
||||
dataSource.setDatabaseName("target/derbyDb");
|
||||
dataSource.setCreateDatabase("create");
|
||||
|
||||
JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
|
||||
jdbcPersistenceAdapter.setDataSource(dataSource);
|
||||
jdbcPersistenceAdapter.setUseLock(false);
|
||||
|
||||
if (!useDefaultStore) {
|
||||
brokerService.setPersistenceAdapter(jdbcPersistenceAdapter);
|
||||
} else {
|
||||
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
|
||||
kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
|
||||
}
|
||||
brokerService.setDeleteAllMessagesOnStartup(true);
|
||||
brokerService.setUseJmx(false);
|
||||
brokerService.setAdvisorySupport(false);
|
||||
|
||||
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry defaultEntry = new PolicyEntry();
|
||||
defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract!
|
||||
defaultEntry.setMaxAuditDepth(publisherThreadCount);
|
||||
defaultEntry.setEnableAudit(true);
|
||||
defaultEntry.setUseCache(useCache);
|
||||
defaultEntry.setMaxPageSize(1000);
|
||||
defaultEntry.setOptimizedDispatch(optimizeDispatch);
|
||||
defaultEntry.setMemoryLimit(destMemoryLimit);
|
||||
defaultEntry.setExpireMessagesPeriod(0);
|
||||
policyMap.setDefaultEntry(defaultEntry);
|
||||
brokerService.setDestinationPolicy(policyMap);
|
||||
|
||||
brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024);
|
||||
|
||||
TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0");
|
||||
brokerService.start();
|
||||
activemqURL = transportConnector.getPublishableConnectString();
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopBroker() throws Exception {
|
||||
if (brokerService != null) {
|
||||
brokerService.stop();
|
||||
}
|
||||
try {
|
||||
dataSource.setShutdownDatabase("shutdown");
|
||||
dataSource.getConnection();
|
||||
} catch (Exception ignored) {}
|
||||
}
|
||||
|
||||
CyclicBarrier globalProducerHalt = new CyclicBarrier(publisherThreadCount, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// wait for queue size to go to zero
|
||||
try {
|
||||
while (((RegionBroker)brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount() > 0) {
|
||||
LOG.info("Total messageCount: " + ((RegionBroker)brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
}
|
||||
} catch (Exception ignored) {
|
||||
ignored.printStackTrace();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@Test(timeout = 30 * 60 * 1000)
|
||||
public void test() throws Exception {
|
||||
|
||||
String activemqQueues = "activemq,activemq2,activemq3,activemq4";//,activemq5,activemq6,activemq7,activemq8,activemq9";
|
||||
|
||||
int consumerWaitForConsumption = 5 * 60 * 1000;
|
||||
|
||||
ExportQueuePublisher publisher = null;
|
||||
ExportQueueConsumer consumer = null;
|
||||
|
||||
LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified.");
|
||||
LOG.info("\nBuilding Publisher...");
|
||||
|
||||
publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount);
|
||||
|
||||
LOG.info("Building Consumer...");
|
||||
|
||||
consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount);
|
||||
|
||||
|
||||
LOG.info("Starting Publisher...");
|
||||
|
||||
publisher.start();
|
||||
|
||||
LOG.info("Starting Consumer...");
|
||||
|
||||
consumer.start();
|
||||
|
||||
int distinctPublishedCount = 0;
|
||||
|
||||
|
||||
LOG.info("Waiting For Publisher Completion...");
|
||||
|
||||
publisher.waitForCompletion();
|
||||
|
||||
List publishedIds = publisher.getIDs();
|
||||
distinctPublishedCount = new TreeSet(publishedIds).size();
|
||||
|
||||
LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount);
|
||||
|
||||
|
||||
long endWait = System.currentTimeMillis() + consumerWaitForConsumption;
|
||||
while (!consumer.completed() && System.currentTimeMillis() < endWait) {
|
||||
try {
|
||||
int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
|
||||
LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
|
||||
if (!useDefaultStore) {
|
||||
DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
|
||||
}
|
||||
Thread.sleep(10000);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
|
||||
LOG.info("\nConsumer Complete: " + consumer.completed() +", Shutting Down.");
|
||||
|
||||
consumer.shutdown();
|
||||
|
||||
TimeUnit.SECONDS.sleep(2);
|
||||
LOG.info("DB Contents START");
|
||||
if (!useDefaultStore) {
|
||||
DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
|
||||
}
|
||||
LOG.info("DB Contents END");
|
||||
|
||||
LOG.info("Consumer Stats:");
|
||||
|
||||
for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) {
|
||||
|
||||
List<String> idList = entry.getValue();
|
||||
|
||||
int distinctConsumed = new TreeSet<String>(idList).size();
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(" Queue: " + entry.getKey() +
|
||||
" -> Total Messages Consumed: " + idList.size() +
|
||||
", Distinct IDs Consumed: " + distinctConsumed);
|
||||
|
||||
int diff = distinctPublishedCount - distinctConsumed;
|
||||
sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) ");
|
||||
LOG.info(sb.toString());
|
||||
|
||||
assertEquals("expect to get all messages!", 0, diff);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public class ExportQueuePublisher {
|
||||
|
||||
private final String amqUser = ActiveMQConnection.DEFAULT_USER;
|
||||
private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
|
||||
private ActiveMQConnectionFactory connectionFactory = null;
|
||||
private String activemqURL = null;
|
||||
private String activemqQueues = null;
|
||||
// Collection of distinct IDs that the publisher has published.
|
||||
// After a message is published, its UUID will be written to this list for tracking.
|
||||
// This list of IDs (or distinct count) will be used to compare to the consumed list of IDs.
|
||||
//private Set<String> ids = Collections.synchronizedSet(new TreeSet<String>());
|
||||
private List<String> ids = Collections.synchronizedList(new ArrayList<String>());
|
||||
private List<PublisherThread> threads;
|
||||
|
||||
public ExportQueuePublisher(String activemqURL, String activemqQueues, int messagesPerThread, int threadCount) throws Exception {
|
||||
|
||||
this.activemqURL = activemqURL;
|
||||
this.activemqQueues = activemqQueues;
|
||||
|
||||
threads = new ArrayList<PublisherThread>();
|
||||
|
||||
// Build the threads and tell them how many messages to publish
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
PublisherThread pt = new PublisherThread(messagesPerThread);
|
||||
threads.add(pt);
|
||||
}
|
||||
}
|
||||
|
||||
public List<String> getIDs() {
|
||||
return ids;
|
||||
}
|
||||
|
||||
// Kick off threads
|
||||
public void start() throws Exception {
|
||||
|
||||
for (PublisherThread pt : threads) {
|
||||
pt.start();
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for threads to complete. They will complete once they've published all of their messages.
|
||||
public void waitForCompletion() throws Exception {
|
||||
|
||||
for (PublisherThread pt : threads) {
|
||||
pt.join();
|
||||
pt.close();
|
||||
}
|
||||
}
|
||||
|
||||
private Session newSession(QueueConnection queueConnection) throws Exception {
|
||||
return queueConnection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
}
|
||||
|
||||
private synchronized QueueConnection newQueueConnection() throws Exception {
|
||||
|
||||
if (connectionFactory == null) {
|
||||
connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
|
||||
connectionFactory.setWatchTopicAdvisories(false);
|
||||
}
|
||||
|
||||
// Set the redelivery count to -1 (infinite), or else messages will start dropping
|
||||
// after the queue has had a certain number of failures (default is 6)
|
||||
RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
|
||||
policy.setMaximumRedeliveries(-1);
|
||||
|
||||
QueueConnection amqConnection = connectionFactory.createQueueConnection();
|
||||
amqConnection.start();
|
||||
return amqConnection;
|
||||
}
|
||||
|
||||
private class PublisherThread extends Thread {
|
||||
|
||||
private int count;
|
||||
private QueueConnection qc;
|
||||
private Session session;
|
||||
private MessageProducer mp;
|
||||
private Queue q;
|
||||
|
||||
private PublisherThread(int count) throws Exception {
|
||||
|
||||
this.count = count;
|
||||
|
||||
// Each Thread has its own Connection and Session, so no sync worries
|
||||
qc = newQueueConnection();
|
||||
session = newSession(qc);
|
||||
|
||||
// In our code, when publishing to multiple queues,
|
||||
// we're using composite destinations like below
|
||||
q = new ActiveMQQueue(activemqQueues);
|
||||
mp = session.createProducer(null);
|
||||
}
|
||||
|
||||
public void run() {
|
||||
|
||||
try {
|
||||
|
||||
// Loop until we've published enough messages
|
||||
while (count-- > 0) {
|
||||
|
||||
TextMessage tm = session.createTextMessage(getMessageText());
|
||||
String id = UUID.randomUUID().toString();
|
||||
tm.setStringProperty("KEY", id);
|
||||
ids.add(id); // keep track of the key to compare against consumer
|
||||
|
||||
mp.send(q, tm);
|
||||
session.commit();
|
||||
|
||||
if (didNotReceive.get()) {
|
||||
globalProducerHalt.await();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
// Called by waitForCompletion
|
||||
public void close() {
|
||||
|
||||
try {
|
||||
mp.close();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
try {
|
||||
session.close();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
try {
|
||||
qc.close();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
String messageText;
|
||||
private String getMessageText() {
|
||||
|
||||
if (messageText == null) {
|
||||
|
||||
synchronized (this) {
|
||||
|
||||
if (messageText == null) {
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (int i = 0; i < messageSize; i++) {
|
||||
sb.append("X");
|
||||
}
|
||||
messageText = sb.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return messageText;
|
||||
}
|
||||
|
||||
|
||||
public class ExportQueueConsumer {
|
||||
|
||||
private final String amqUser = ActiveMQConnection.DEFAULT_USER;
|
||||
private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
|
||||
private final int totalToExpect;
|
||||
private ActiveMQConnectionFactory connectionFactory = null;
|
||||
private String activemqURL = null;
|
||||
private String activemqQueues = null;
|
||||
private String[] queues = null;
|
||||
// Map of IDs that were consumed, keyed by queue name.
|
||||
// We'll compare these against what was published to know if any got stuck or dropped.
|
||||
private Map<String, List<String>> idsByQueue = new HashMap<String, List<String>>();
|
||||
private Map<String, List<ConsumerThread>> threads;
|
||||
|
||||
public ExportQueueConsumer(String activemqURL, String activemqQueues, int threadsPerQueue, int batchSize, int totalToExpect) throws Exception {
|
||||
|
||||
this.activemqURL = activemqURL;
|
||||
this.activemqQueues = activemqQueues;
|
||||
this.totalToExpect = totalToExpect;
|
||||
|
||||
queues = this.activemqQueues.split(",");
|
||||
|
||||
for (int i = 0; i < queues.length; i++) {
|
||||
queues[i] = queues[i].trim();
|
||||
}
|
||||
|
||||
threads = new HashMap<String, List<ConsumerThread>>();
|
||||
|
||||
// For each queue, create a list of threads and set up the list of ids
|
||||
for (String q : queues) {
|
||||
|
||||
List<ConsumerThread> list = new ArrayList<ConsumerThread>();
|
||||
|
||||
idsByQueue.put(q, Collections.synchronizedList(new ArrayList<String>()));
|
||||
|
||||
for (int i = 0; i < threadsPerQueue; i++) {
|
||||
list.add(new ConsumerThread(q, batchSize));
|
||||
}
|
||||
|
||||
threads.put(q, list);
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, List<String>> getIDs() {
|
||||
return idsByQueue;
|
||||
}
|
||||
|
||||
// Start the threads
|
||||
public void start() throws Exception {
|
||||
|
||||
for (List<ConsumerThread> list : threads.values()) {
|
||||
|
||||
for (ConsumerThread ct : list) {
|
||||
|
||||
ct.start();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Tell the threads to stop
|
||||
// Then wait for them to stop
|
||||
public void shutdown() throws Exception {
|
||||
|
||||
for (List<ConsumerThread> list : threads.values()) {
|
||||
|
||||
for (ConsumerThread ct : list) {
|
||||
|
||||
ct.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
for (List<ConsumerThread> list : threads.values()) {
|
||||
|
||||
for (ConsumerThread ct : list) {
|
||||
|
||||
ct.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Session newSession(QueueConnection queueConnection) throws Exception {
|
||||
return queueConnection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
}
|
||||
|
||||
private synchronized QueueConnection newQueueConnection() throws Exception {
|
||||
|
||||
if (connectionFactory == null) {
|
||||
connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
|
||||
connectionFactory.setWatchTopicAdvisories(false);
|
||||
}
|
||||
|
||||
// Set the redelivery count to -1 (infinite), or else messages will start dropping
|
||||
// after the queue has had a certain number of failures (default is 6)
|
||||
RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
|
||||
policy.setMaximumRedeliveries(-1);
|
||||
|
||||
QueueConnection amqConnection = connectionFactory.createQueueConnection();
|
||||
amqConnection.start();
|
||||
return amqConnection;
|
||||
}
|
||||
|
||||
public boolean completed() {
|
||||
for (List<ConsumerThread> list : threads.values()) {
|
||||
|
||||
for (ConsumerThread ct : list) {
|
||||
|
||||
if (ct.isAlive()) {
|
||||
LOG.info("thread for {} is still alive.", ct.qName);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private class ConsumerThread extends Thread {
|
||||
|
||||
private int batchSize;
|
||||
private QueueConnection qc;
|
||||
private Session session;
|
||||
private MessageConsumer mc;
|
||||
private List<String> idList;
|
||||
private boolean shutdown = false;
|
||||
private String qName;
|
||||
|
||||
private ConsumerThread(String queueName, int batchSize) throws Exception {
|
||||
|
||||
this.batchSize = batchSize;
|
||||
|
||||
// Each thread has its own connection and session
|
||||
qName = queueName;
|
||||
qc = newQueueConnection();
|
||||
session = newSession(qc);
|
||||
Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize);
|
||||
mc = session.createConsumer(q);
|
||||
|
||||
idList = idsByQueue.get(queueName);
|
||||
}
|
||||
|
||||
public void run() {
|
||||
|
||||
try {
|
||||
|
||||
int count = 0;
|
||||
|
||||
// Keep reading as long as it hasn't been told to shutdown
|
||||
while (!shutdown) {
|
||||
|
||||
if (idList.size() >= totalToExpect) {
|
||||
LOG.info("Got {} for q: {}", +idList.size(), qName);
|
||||
session.commit();
|
||||
break;
|
||||
}
|
||||
Message m = mc.receive(4000);
|
||||
|
||||
if (m != null) {
|
||||
|
||||
// We received a non-null message, add the ID to our list
|
||||
|
||||
idList.add(m.getStringProperty("KEY"));
|
||||
|
||||
count++;
|
||||
|
||||
// If we've reached our batch size, commit the batch and reset the count
|
||||
|
||||
if (count == batchSize) {
|
||||
session.commit();
|
||||
count = 0;
|
||||
}
|
||||
} else {
|
||||
|
||||
// We didn't receive anything this time, commit any current batch and reset the count
|
||||
|
||||
session.commit();
|
||||
count = 0;
|
||||
|
||||
// Sleep a little before trying to read after not getting a message
|
||||
|
||||
try {
|
||||
if (idList.size() < totalToExpect) {
|
||||
LOG.info("did not receive on {}, current count: {}", qName, idList.size());
|
||||
didNotReceive.set(true);
|
||||
}
|
||||
//sleep(3000);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
|
||||
// Once we exit, close everything
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
shutdown = true;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
|
||||
try {
|
||||
mc.close();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
try {
|
||||
session.close();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
try {
|
||||
qc.close();
|
||||
} catch (Exception e) {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue