mirror of https://github.com/apache/activemq.git
better support for immediate priority higher message dispatch for durable subs, tidy up audit and page size config for durables, use destination batch size to page in, remove extranious calls to store getSize(), test for duplicate delivery, audit needs to be large when priority is used and needs to be statically configured, prefetch is a bad model in with priority spread, restrict priority values to 0-9 as impl is restricted to 0-9 jms range
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1044368 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
afa3f24107
commit
6519c3e232
|
@ -26,6 +26,7 @@ import javax.jms.JMSException;
|
|||
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
|
||||
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
|
@ -84,6 +85,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
|
|||
node.getRegionDestination().acknowledge(this.getContext(), this, ack, node);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
|
||||
// statically configured via maxPageSize
|
||||
}
|
||||
|
||||
public void add(ConnectionContext context, Destination destination) throws Exception {
|
||||
super.add(context, destination);
|
||||
// do it just once per destination
|
||||
|
@ -117,12 +123,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
|
|||
this.context = context;
|
||||
this.info = info;
|
||||
LOG.debug("Activating " + this);
|
||||
int prefetch = info.getPrefetchSize();
|
||||
if (prefetch>0) {
|
||||
prefetch += prefetch/2;
|
||||
}
|
||||
int depth = Math.max(prefetch, this.pending.getMaxAuditDepth());
|
||||
this.pending.setMaxAuditDepth(depth);
|
||||
if (!keepDurableSubsActive) {
|
||||
for (Iterator<Destination> iter = destinations.values()
|
||||
.iterator(); iter.hasNext();) {
|
||||
|
@ -134,6 +134,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
|
|||
synchronized (pending) {
|
||||
pending.setSystemUsage(memoryManager);
|
||||
pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
|
||||
pending.setMaxAuditDepth(getMaxAuditDepth());
|
||||
pending.setMaxProducersToAudit(getMaxProducersToAudit());
|
||||
pending.start();
|
||||
// If nothing was in the persistent store, then try to use the
|
||||
// recovery policy.
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import javax.jms.InvalidSelectorException;
|
||||
import javax.jms.JMSException;
|
||||
import org.apache.activemq.ActiveMQMessageAudit;
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
|
||||
|
@ -67,7 +66,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
protected final SystemUsage usageManager;
|
||||
private final Object pendingLock = new Object();
|
||||
private final Object dispatchLock = new Object();
|
||||
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
|
||||
private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
|
||||
|
||||
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
|
||||
|
@ -569,7 +567,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
int numberToDispatch = countBeforeFull();
|
||||
if (numberToDispatch > 0) {
|
||||
setSlowConsumer(false);
|
||||
pending.setMaxBatchSize(numberToDispatch);
|
||||
setPendingBatchSize(pending, numberToDispatch);
|
||||
int count = 0;
|
||||
pending.reset();
|
||||
while (pending.hasNext() && !isFull()
|
||||
|
@ -614,6 +612,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
}
|
||||
}
|
||||
|
||||
protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
|
||||
pending.setMaxBatchSize(numberToDispatch);
|
||||
}
|
||||
|
||||
protected boolean dispatch(final MessageReference node) throws IOException {
|
||||
final Message message = node.getMessage();
|
||||
if (message == null) {
|
||||
|
|
|
@ -95,8 +95,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
* the cache. If subsequently, we pull out that message from the store (before its deleted)
|
||||
* it will be a duplicate - but should be ignored
|
||||
*/
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
|
||||
+ " cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority());
|
||||
}
|
||||
storeHasMessages = true;
|
||||
}
|
||||
|
@ -125,7 +126,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
private synchronized void clearIterator(boolean ensureIterator) {
|
||||
boolean haveIterator = this.iterator != null;
|
||||
this.iterator=null;
|
||||
last = null;
|
||||
if(haveIterator&&ensureIterator) {
|
||||
ensureIterator();
|
||||
}
|
||||
|
@ -176,8 +176,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
} else {
|
||||
if (cacheEnabled) {
|
||||
cacheEnabled=false;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " disabling cache on size:" + size
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName() + " disabling cache on size:" + size
|
||||
+ ", lastCachedIdSeq: " + (lastCachedId == null ? -1 : lastCachedId.getBrokerSequenceId())
|
||||
+ " current node seqId: " + node.getMessageId().getBrokerSequenceId());
|
||||
}
|
||||
|
@ -239,11 +239,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
clearIterator(false);
|
||||
batchResetNeeded = true;
|
||||
this.cacheEnabled=false;
|
||||
if (isStarted()) {
|
||||
size = getStoreSize();
|
||||
} else {
|
||||
size = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(StoreDurableSubscriberCursor.class);
|
||||
private static final int UNKNOWN = -1;
|
||||
private final String clientId;
|
||||
private final String subscriberName;
|
||||
private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>();
|
||||
|
@ -49,7 +50,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
|
|||
private final PendingMessageCursor nonPersistent;
|
||||
private PendingMessageCursor currentCursor;
|
||||
private final Subscription subscription;
|
||||
private int lastAddPriority = 0;
|
||||
private int cacheCurrentPriority = UNKNOWN;
|
||||
private boolean immediatePriorityDispatch = true;
|
||||
/**
|
||||
* @param broker Broker for this cursor
|
||||
|
@ -72,12 +73,15 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
|
|||
this.nonPersistent.setMaxBatchSize(maxBatchSize);
|
||||
this.nonPersistent.setSystemUsage(systemUsage);
|
||||
this.storePrefetches.add(this.nonPersistent);
|
||||
|
||||
if (prioritizedMessages) {
|
||||
setMaxAuditDepth(10*getMaxAuditDepth());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() throws Exception {
|
||||
if (!isStarted()) {
|
||||
lastAddPriority = 0;
|
||||
super.start();
|
||||
for (PendingMessageCursor tsp : storePrefetches) {
|
||||
tsp.setMessageAudit(getMessageAudit());
|
||||
|
@ -107,11 +111,10 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
|
|||
public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
|
||||
if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
|
||||
TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName);
|
||||
tsp.setMaxBatchSize(getMaxBatchSize());
|
||||
tsp.setMaxBatchSize(destination.getMaxPageSize());
|
||||
tsp.setSystemUsage(systemUsage);
|
||||
tsp.setMessageAudit(getMessageAudit());
|
||||
tsp.setEnableAudit(isEnableAudit());
|
||||
tsp.setMaxAuditDepth(getMaxAuditDepth());
|
||||
tsp.setMaxProducersToAudit(getMaxProducersToAudit());
|
||||
tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark());
|
||||
topics.put(destination, tsp);
|
||||
storePrefetches.add(tsp);
|
||||
|
@ -184,16 +187,29 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
|
|||
Destination dest = msg.getRegionDestination();
|
||||
TopicStorePrefetch tsp = topics.get(dest);
|
||||
if (tsp != null) {
|
||||
tsp.addMessageLast(node);
|
||||
|
||||
// tps becomes a highest priority only cache when we have a new higher priority
|
||||
// message and we are not currently caching
|
||||
final int priority = msg.getPriority();
|
||||
if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.cacheEnabled) {
|
||||
final int priority = msg.getPriority();
|
||||
if (priority > lastAddPriority) {
|
||||
if (priority > tsp.getLastDispatchPriority()) {
|
||||
// go get the latest priority message
|
||||
LOG.debug("Clearing cursor on high priority message " + priority);
|
||||
tsp.clear();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("enabling cache for cursor on high priority message " + priority);
|
||||
}
|
||||
tsp.cacheEnabled = true;
|
||||
cacheCurrentPriority = priority;
|
||||
}
|
||||
} else if (cacheCurrentPriority > 0 && priority < cacheCurrentPriority) {
|
||||
// go to the store to get next priority message as lower priority messages may be recovered
|
||||
// already
|
||||
tsp.clear();
|
||||
cacheCurrentPriority = UNKNOWN;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("disabling/clearing cache for cursor on lower priority message " + priority);
|
||||
}
|
||||
lastAddPriority = priority;
|
||||
}
|
||||
tsp.addMessageLast(node);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -272,12 +288,10 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
|
|||
|
||||
@Override
|
||||
public void setMaxBatchSize(int newMaxBatchSize) {
|
||||
if (newMaxBatchSize > getMaxBatchSize()) {
|
||||
for (PendingMessageCursor storePrefetch : storePrefetches) {
|
||||
storePrefetch.setMaxBatchSize(newMaxBatchSize);
|
||||
}
|
||||
super.setMaxBatchSize(newMaxBatchSize);
|
||||
for (PendingMessageCursor storePrefetch : storePrefetches) {
|
||||
storePrefetch.setMaxBatchSize(newMaxBatchSize);
|
||||
}
|
||||
super.setMaxBatchSize(newMaxBatchSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -104,6 +104,10 @@ class TopicStorePrefetch extends AbstractStoreCursor {
|
|||
maxBatchSize, this);
|
||||
}
|
||||
|
||||
public int getLastDispatchPriority() {
|
||||
return last != null? last.getMessage().getPriority() : 9;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + "," + subscriberName + ")";
|
||||
|
|
|
@ -213,7 +213,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
sub.setPrefetchSize(getDurableTopicPrefetch());
|
||||
}
|
||||
if (pendingDurableSubscriberPolicy != null) {
|
||||
PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,prefetch,sub);
|
||||
PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,sub.getPrefetchSize(),sub);
|
||||
cursor.setSystemUsage(memoryManager);
|
||||
sub.setPending(cursor);
|
||||
}
|
||||
|
|
|
@ -357,7 +357,13 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
|||
}
|
||||
|
||||
public void setPriority(byte priority) {
|
||||
this.priority = priority;
|
||||
if (priority < 0) {
|
||||
this.priority = 0;
|
||||
} else if (priority > 9) {
|
||||
this.priority = 9;
|
||||
} else {
|
||||
this.priority = priority;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -66,7 +66,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
|
|||
adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]);
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1]);
|
||||
LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
|
||||
|
@ -151,6 +151,9 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
|
|||
adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
|
||||
lastRecovered.sequence, 0, maxReturned, jdbcListener);
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(key + " last recovered: " + lastRecovered);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
|
||||
} finally {
|
||||
|
|
|
@ -264,6 +264,12 @@ public class ActiveMQMessageTest extends TestCase {
|
|||
ActiveMQMessage msg = new ActiveMQMessage();
|
||||
msg.setJMSPriority(this.jmsPriority);
|
||||
assertTrue(msg.getJMSPriority() == this.jmsPriority);
|
||||
|
||||
msg.setJMSPriority(-90);
|
||||
assertEquals(0, msg.getJMSPriority());
|
||||
|
||||
msg.setJMSPriority(90);
|
||||
assertEquals(9, msg.getJMSPriority());
|
||||
}
|
||||
|
||||
public void testClearProperties() throws JMSException {
|
||||
|
|
|
@ -293,28 +293,63 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
|
|||
TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
|
||||
sub.close();
|
||||
|
||||
ProducerThread producerThread = new ProducerThread(topic, 5000, LOW_PRI);
|
||||
final int numToProduce = 2000;
|
||||
final int[] dups = new int[numToProduce*2];
|
||||
ProducerThread producerThread = new ProducerThread(topic, numToProduce, LOW_PRI+1);
|
||||
producerThread.run();
|
||||
LOG.info("Low priority messages sent");
|
||||
|
||||
sub = sess.createDurableSubscriber(topic, subName);
|
||||
for (int i=0; i<200;i++) {
|
||||
final int batchSize = 250;
|
||||
int lowLowCount = 0;
|
||||
for (int i=0; i<numToProduce; i++) {
|
||||
Message msg = sub.receive(15000);
|
||||
LOG.info("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() + "-" + msg.getJMSPriority() : null));
|
||||
assertNotNull("Message " + i + " was null", msg);
|
||||
assertEquals("Message " + i + " has wrong priority", LOW_PRI+1, msg.getJMSPriority());
|
||||
assertTrue("not duplicate ", dups[i] == 0);
|
||||
dups[i] = 1;
|
||||
|
||||
if (i % batchSize == 0) {
|
||||
producerThread.setMessagePriority(HIGH_PRI);
|
||||
producerThread.setMessageCount(1);
|
||||
producerThread.run();
|
||||
LOG.info("High priority message sent, should be able to receive immediately");
|
||||
|
||||
if (i % batchSize*2 == 0) {
|
||||
producerThread.setMessagePriority(HIGH_PRI -1);
|
||||
producerThread.setMessageCount(1);
|
||||
producerThread.run();
|
||||
LOG.info("High -1 priority message sent, should be able to receive immediately");
|
||||
}
|
||||
|
||||
if (i % batchSize*4 == 0) {
|
||||
producerThread.setMessagePriority(LOW_PRI);
|
||||
producerThread.setMessageCount(1);
|
||||
producerThread.run();
|
||||
lowLowCount++;
|
||||
LOG.info("Low low priority message sent, should not be able to receive immediately");
|
||||
}
|
||||
|
||||
msg = sub.receive(15000);
|
||||
assertNotNull("Message was null", msg);
|
||||
LOG.info("received hi? : " + msg);
|
||||
assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
|
||||
|
||||
if (i % batchSize*2 == 0) {
|
||||
msg = sub.receive(15000);
|
||||
assertNotNull("Message was null", msg);
|
||||
LOG.info("received hi -1 ? i=" + i + ", " + msg);
|
||||
assertEquals("high priority", HIGH_PRI -1, msg.getJMSPriority());
|
||||
}
|
||||
}
|
||||
}
|
||||
for (int i=0; i<lowLowCount; i++) {
|
||||
Message msg = sub.receive(15000);
|
||||
LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null));
|
||||
assertNotNull("Message " + i + " was null", msg);
|
||||
assertEquals("Message " + i + " has wrong priority", LOW_PRI, msg.getJMSPriority());
|
||||
}
|
||||
|
||||
producerThread.setMessagePriority(HIGH_PRI);
|
||||
producerThread.setMessageCount(1);
|
||||
producerThread.run();
|
||||
LOG.info("High priority message sent");
|
||||
|
||||
// try and get the high priority message
|
||||
Message msg = sub.receive(15000);
|
||||
assertNotNull("Message was null", msg);
|
||||
LOG.info("received: " + msg);
|
||||
assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,293 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.usecases;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
|
||||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class JdbcDurableSubDupTest {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(JdbcDurableSubDupTest.class);
|
||||
final int prefetchVal = 150;
|
||||
String url = "tcp://localhost:61616?jms.watchTopicAdvisories=false";
|
||||
String queueName = "topicTest?consumer.prefetchSize=" + prefetchVal;
|
||||
String xmlMessage = "<Example 01234567890123456789012345678901234567890123456789 MessageText>";
|
||||
|
||||
String selector = "";
|
||||
String clntVersion = "87";
|
||||
String clntId = "timsClntId345" + clntVersion;
|
||||
String subscriptionName = "subscriptionName-y" + clntVersion;
|
||||
SimpleDateFormat dtf = new SimpleDateFormat("HH:mm:ss");
|
||||
|
||||
final int TO_RECEIVE = 5000;
|
||||
BrokerService broker = null;
|
||||
Vector<Throwable> exceptions = new Vector();
|
||||
final int MAX_MESSAGES = 100000;
|
||||
int[] dupChecker = new int[MAX_MESSAGES];
|
||||
|
||||
@Before
|
||||
public void startBroker() throws Exception {
|
||||
exceptions.clear();
|
||||
for (int i = 0; i < MAX_MESSAGES; i++) {
|
||||
dupChecker[i] = 0;
|
||||
}
|
||||
broker = new BrokerService();
|
||||
broker.setAdvisorySupport(false);
|
||||
broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
|
||||
PolicyEntry policyEntry = new PolicyEntry();
|
||||
policyEntry.setMaxAuditDepth(2000);
|
||||
policyEntry.setMaxPageSize(150);
|
||||
policyEntry.setPrioritizedMessages(true);
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
policyMap.setDefaultEntry(policyEntry);
|
||||
broker.setDestinationPolicy(policyMap);
|
||||
|
||||
broker.addConnector("tcp://localhost:61616");
|
||||
broker.setDeleteAllMessagesOnStartup(true);
|
||||
broker.start();
|
||||
broker.waitUntilStarted();
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopBroker() throws Exception {
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoDupsOnSlowConsumerReconnect() throws Exception {
|
||||
JmsConsumerDup consumer = new JmsConsumerDup();
|
||||
consumer.done.set(true);
|
||||
consumer.run();
|
||||
|
||||
consumer.done.set(false);
|
||||
|
||||
LOG.info("serial production then consumption");
|
||||
JmsProvider provider = new JmsProvider();
|
||||
provider.run();
|
||||
|
||||
consumer.run();
|
||||
|
||||
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
|
||||
|
||||
for (int i = 0; i < TO_RECEIVE; i++) {
|
||||
assertTrue("got message " + i, dupChecker[i] == 1);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoDupsOnSlowConsumerLargePriorityGapReconnect() throws Exception {
|
||||
JmsConsumerDup consumer = new JmsConsumerDup();
|
||||
consumer.done.set(true);
|
||||
consumer.run();
|
||||
|
||||
consumer.done.set(false);
|
||||
JmsProvider provider = new JmsProvider();
|
||||
provider.priorityModulator = 2500;
|
||||
provider.run();
|
||||
|
||||
consumer.run();
|
||||
|
||||
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
|
||||
for (int i = 0; i < TO_RECEIVE; i++) {
|
||||
assertTrue("got message " + i, dupChecker[i] == 1);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class JmsConsumerDup implements MessageListener {
|
||||
long count = 0;
|
||||
|
||||
AtomicBoolean done = new AtomicBoolean(false);
|
||||
|
||||
public void run() {
|
||||
Connection connection = null;
|
||||
Session session;
|
||||
Topic topic;
|
||||
ActiveMQConnectionFactory factory;
|
||||
MessageConsumer consumer;
|
||||
|
||||
factory = new ActiveMQConnectionFactory(url);
|
||||
|
||||
try {
|
||||
connection = factory.createConnection("MyUsername", "MyPassword");
|
||||
connection.setClientID(clntId);
|
||||
connection.start();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
topic = session.createTopic(queueName);
|
||||
consumer = session.createDurableSubscriber(topic, subscriptionName, selector, false);
|
||||
consumer.setMessageListener(this);
|
||||
LOG.info("Waiting for messages...");
|
||||
|
||||
while (!done.get()) {
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
if (count == TO_RECEIVE || !exceptions.isEmpty()) {
|
||||
done.set(true);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("caught", e);
|
||||
exceptions.add(e);
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
try {
|
||||
LOG.info("consumer done (" + exceptions.isEmpty() + "), closing connection");
|
||||
connection.close();
|
||||
} catch (JMSException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void onMessage(Message message) {
|
||||
++count;
|
||||
|
||||
try {
|
||||
Thread.sleep(0L);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
;
|
||||
|
||||
try {
|
||||
TextMessage m = (TextMessage) message;
|
||||
|
||||
if (count%100 == 0) {
|
||||
LOG.info("Rcvd Msg #-" + count + " " + m.getText()
|
||||
+ " Sent->" + dtf.format(new Date(m.getJMSTimestamp()))
|
||||
+ " Recv->" + dtf.format(new Date())
|
||||
+ " Expr->" + dtf.format(new Date(m.getJMSExpiration()))
|
||||
+ ", mid: " + m.getJMSMessageID()
|
||||
);
|
||||
}
|
||||
int i = m.getIntProperty("SeqNo");
|
||||
|
||||
//check for duplicate messages
|
||||
if (i < MAX_MESSAGES) {
|
||||
if (dupChecker[i] == 1) {
|
||||
LOG.error("Duplicate message received at count: " + count + ", id: " + m.getJMSMessageID());
|
||||
exceptions.add(new RuntimeException("Got Duplicate at: " + m.getJMSMessageID()));
|
||||
|
||||
} else {
|
||||
dupChecker[i] = 1;
|
||||
}
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
LOG.error("caught ", e);
|
||||
exceptions.add(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class JmsProvider implements Runnable {
|
||||
|
||||
int priorityModulator = 10;
|
||||
|
||||
public void run() {
|
||||
|
||||
Connection connection;
|
||||
Session session;
|
||||
Topic topic;
|
||||
|
||||
ActiveMQConnectionFactory factory;
|
||||
MessageProducer messageProducer;
|
||||
long timeToLive = 0l;
|
||||
|
||||
TextMessage message = null;
|
||||
|
||||
factory = new ActiveMQConnectionFactory(url);
|
||||
|
||||
try {
|
||||
connection = factory.createConnection("MyUserName", "MyPassword");
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
topic = session.createTopic(queueName);
|
||||
messageProducer = session.createProducer(topic);
|
||||
messageProducer.setPriority(3);
|
||||
messageProducer.setTimeToLive(timeToLive);
|
||||
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
|
||||
int msgSeqNo = 0;
|
||||
int NUM_MSGS = 1000;
|
||||
int NUM_GROUPS = TO_RECEIVE/NUM_MSGS;
|
||||
for (int n = 0; n < NUM_GROUPS; n++) {
|
||||
|
||||
message = session.createTextMessage();
|
||||
|
||||
for (int i = 0; i < NUM_MSGS; i++) {
|
||||
int priority = 0;
|
||||
if (priorityModulator <= 10) {
|
||||
priority = msgSeqNo % priorityModulator;
|
||||
} else {
|
||||
priority = (msgSeqNo >= priorityModulator) ? 9 : 0;
|
||||
}
|
||||
message.setText(xmlMessage + msgSeqNo + "-" + priority);
|
||||
message.setJMSPriority(priority);
|
||||
message.setIntProperty("SeqNo", msgSeqNo);
|
||||
if (i > 0 && i%100 == 0) {
|
||||
LOG.info("Sending message: " + message.getText());
|
||||
}
|
||||
messageProducer.send(message, DeliveryMode.PERSISTENT, message.getJMSPriority(), timeToLive);
|
||||
msgSeqNo++;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1000L);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
exceptions.add(e);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (JMSException e) {
|
||||
LOG.error("caught ", e);
|
||||
e.printStackTrace();
|
||||
exceptions.add(e);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue