diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 7c67446ca8..a35b404701 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -49,6 +49,9 @@ public abstract class BaseDestination implements Destination { public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2; public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000; public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000; + public static final int MAX_PRODUCERS_TO_AUDIT = 64; + public static final int MAX_AUDIT_DEPTH = 2048; + protected final ActiveMQDestination destination; protected final Broker broker; protected final MessageStore store; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 0863d6cc77..83fafca35b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -63,7 +63,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } - public boolean isActive() { + public final boolean isActive() { return active.get(); } @@ -220,6 +220,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us super.add(node); } + protected void dispatchPending() throws IOException { + if (isActive()) { + super.dispatchPending(); + } + } + protected void doAddRecoveredMessage(MessageReference message) throws Exception { synchronized(pending) { pending.addRecoveredMessage(message); @@ -239,7 +245,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } protected boolean canDispatch(MessageReference node) { - return active.get(); + return isActive(); } protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index 0cc6752678..a6a5de1382 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -40,8 +40,8 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs protected int memoryUsageHighWaterMark = 70; protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE; protected SystemUsage systemUsage; - protected int maxProducersToAudit=1024; - protected int maxAuditDepth=1000; + protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT; + protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH; protected boolean enableAudit=true; protected ActiveMQMessageAudit audit; protected boolean useCache=true; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 73a40fb944..fd9a01f0d1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -36,10 +36,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i private Iterator iterator = null; protected boolean cacheEnabled=false; protected boolean batchResetNeeded = true; - protected boolean storeHasMessages = false; + private boolean storeHasMessages = false; protected int size; private MessageId lastCachedId; - + private boolean hadSpace = false; + protected AbstractStoreCursor(Destination destination) { super((destination != null ? destination.isPrioritizedMessages():false)); this.regionDestination=destination; @@ -89,6 +90,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i batchList.addMessageLast(message); clearIterator(true); recovered = true; + storeHasMessages = true; } else { /* * we should expect to get these - as the message is recorded as it before it goes into @@ -99,7 +101,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority()); } - storeHasMessages = true; } return recovered; } @@ -187,6 +188,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } } } + this.storeHasMessages = true; size++; } @@ -229,7 +231,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } - public final synchronized void gc() { + public synchronized void gc() { for (Iteratori = batchList.iterator();i.hasNext();) { MessageReference msg = i.next(); rollback(msg.getMessageId()); @@ -240,8 +242,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i batchResetNeeded = true; this.cacheEnabled=false; } - - + + @Override + public boolean hasSpace() { + hadSpace = super.hasSpace(); + return hadSpace; + } + protected final synchronized void fillBatch() { if (LOG.isTraceEnabled()) { LOG.trace("fillBatch - batchResetNeeded=" + batchResetNeeded @@ -251,7 +258,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i resetBatch(); this.batchResetNeeded = false; } - if( this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0)) { + if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) { this.storeHasMessages = false; try { doFillBatch(); @@ -259,7 +266,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i LOG.error("Failed to fill batch", e); throw new RuntimeException(e); } - if (!this.batchList.isEmpty()) { + if (!this.batchList.isEmpty() || !hadSpace) { this.storeHasMessages=true; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index cd758fb79c..1f6379d5f6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -50,7 +50,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { private final PendingMessageCursor nonPersistent; private PendingMessageCursor currentCursor; private final Subscription subscription; - private int cacheCurrentPriority = UNKNOWN; + private int cacheCurrentLowestPriority = UNKNOWN; private boolean immediatePriorityDispatch = true; /** * @param broker Broker for this cursor @@ -187,27 +187,27 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { Destination dest = msg.getRegionDestination(); TopicStorePrefetch tsp = topics.get(dest); if (tsp != null) { - - // tps becomes a highest priority only cache when we have a new higher priority - // message and we are not currently caching + // cache can be come high priority cache for immediate dispatch final int priority = msg.getPriority(); if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.cacheEnabled) { - if (priority > tsp.getLastDispatchPriority()) { - // go get the latest priority message + if (priority > tsp.getCurrentLowestPriority()) { if (LOG.isTraceEnabled()) { - LOG.trace("enabling cache for cursor on high priority message " + priority); + LOG.trace("enabling cache for cursor on high priority message " + priority + + ", current lowest: " + tsp.getCurrentLowestPriority()); } tsp.cacheEnabled = true; - cacheCurrentPriority = priority; + cacheCurrentLowestPriority = tsp.getCurrentLowestPriority(); } - } else if (cacheCurrentPriority > 0 && priority < cacheCurrentPriority) { + } else if (cacheCurrentLowestPriority != UNKNOWN && priority <= cacheCurrentLowestPriority) { // go to the store to get next priority message as lower priority messages may be recovered - // already - tsp.clear(); - cacheCurrentPriority = UNKNOWN; + // already and need to acked sequence order if (LOG.isTraceEnabled()) { - LOG.trace("disabling/clearing cache for cursor on lower priority message " + priority); + LOG.trace("disabling/clearing cache for cursor on lower priority message " + + priority + ", tsp current lowest: " + tsp.getCurrentLowestPriority() + + " cache lowest: " + cacheCurrentLowestPriority); } + tsp.cacheEnabled = false; + cacheCurrentLowestPriority = UNKNOWN; } tsp.addMessageLast(node); } @@ -299,6 +299,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { for (PendingMessageCursor tsp : storePrefetches) { tsp.gc(); } + cacheCurrentLowestPriority = UNKNOWN; } @Override diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index aa93a3796b..800e8268c0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -38,6 +38,7 @@ class TopicStorePrefetch extends AbstractStoreCursor { private final String clientId; private final String subscriberName; private final Subscription subscription; + private int currentLowestPriority; /** * @param topic @@ -52,6 +53,15 @@ class TopicStorePrefetch extends AbstractStoreCursor { this.subscriberName = subscriberName; this.maxProducersToAudit=32; this.maxAuditDepth=10000; + resetCurrentLowestPriority(); + } + + private void resetCurrentLowestPriority() { + currentLowestPriority = 9; + } + + public synchronized int getCurrentLowestPriority() { + return currentLowestPriority; } public boolean recoverMessageReference(MessageId messageReference) throws Exception { @@ -62,13 +72,19 @@ class TopicStorePrefetch extends AbstractStoreCursor { @Override public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { + if (LOG.isTraceEnabled()) { + LOG.trace("recover: " + message.getMessageId() + ", priority: " + message.getPriority()); + } + boolean recovered = false; MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext(); messageEvaluationContext.setMessageReference(message); if (this.subscription.matches(message, messageEvaluationContext)) { - return super.recoverMessage(message, cached); + recovered = super.recoverMessage(message, cached); + if (recovered) { + currentLowestPriority = Math.min(currentLowestPriority, message.getPriority()); + } } - return false; - + return recovered; } @Override @@ -84,7 +100,11 @@ class TopicStorePrefetch extends AbstractStoreCursor { @Override protected synchronized boolean isStoreEmpty() { try { - return this.store.isEmpty(); + boolean empty = this.store.isEmpty(); + if (empty) { + resetCurrentLowestPriority(); + } + return empty; } catch (Exception e) { LOG.error("Failed to get message count", e); @@ -97,6 +117,12 @@ class TopicStorePrefetch extends AbstractStoreCursor { protected void resetBatch() { this.store.resetBatching(clientId, subscriberName); } + + @Override + public synchronized void gc() { + super.gc(); + resetCurrentLowestPriority(); + } @Override protected void doFillBatch() throws Exception { @@ -104,10 +130,6 @@ class TopicStorePrefetch extends AbstractStoreCursor { maxBatchSize, this); } - public int getLastDispatchPriority() { - return last != null? last.getMessage().getPriority() : 9; - } - @Override public String toString() { return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + "," + subscriberName + ")"; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index e61a753252..1ba2f4e4ec 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -55,9 +55,9 @@ public class PolicyEntry extends DestinationMapEntry { private PendingQueueMessageStoragePolicy pendingQueuePolicy; private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy; private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy; - private int maxProducersToAudit=32; - private int maxAuditDepth=2048; - private int maxQueueAuditDepth=2048; + private int maxProducersToAudit=BaseDestination.MAX_PRODUCERS_TO_AUDIT; + private int maxAuditDepth=BaseDestination.MAX_AUDIT_DEPTH; + private int maxQueueAuditDepth=BaseDestination.MAX_AUDIT_DEPTH; private boolean enableAudit=true; private boolean producerFlowControl = true; private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; @@ -217,7 +217,12 @@ public class PolicyEntry extends DestinationMapEntry { cursor.setSystemUsage(memoryManager); sub.setPending(cursor); } - sub.setMaxAuditDepth(getMaxAuditDepth()); + int auditDepth = getMaxAuditDepth(); + if (auditDepth == BaseDestination.MAX_AUDIT_DEPTH && this.isPrioritizedMessages()) { + sub.setMaxAuditDepth(auditDepth * 10); + } else { + sub.setMaxAuditDepth(auditDepth); + } sub.setMaxProducersToAudit(getMaxProducersToAudit()); sub.setUsePrefetchExtension(isUsePrefetchExtension()); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java index a2dccec342..bf4df92a6c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java @@ -144,6 +144,9 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess } }; + if (LOG.isTraceEnabled()) { + LOG.trace(key + " existing last recovered: " + lastRecovered); + } if (isPrioritizedMessages()) { adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName, lastRecovered.sequence, lastRecovered.priority, maxReturned, jdbcListener); @@ -223,7 +226,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess int result = 0; TransactionContext c = persistenceAdapter.getTransactionContext(); try { - result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages()); + result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages()); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java index fc5cba5048..2f3ac69520 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java @@ -98,7 +98,7 @@ public class NegativeQueueTest extends AutoFailTestSupport { protected static int PREFETCH_SIZE = 1000; protected BrokerService broker; - protected String bindAddress = "tcp://localhost:60706"; + protected String bindAddress = "tcp://localhost:0"; public void testWithDefaultPrefetch() throws Exception{ PREFETCH_SIZE = 1000; @@ -311,6 +311,7 @@ public class NegativeQueueTest extends AutoFailTestSupport { configureBroker(answer); answer.start(); answer.waitUntilStarted(); + bindAddress = answer.getTransportConnectors().get(0).getConnectUri().toString(); return answer; } @@ -329,7 +330,7 @@ public class NegativeQueueTest extends AutoFailTestSupport { pMap.setDefaultEntry(policy); answer.setDestinationPolicy(pMap); answer.setDeleteAllMessagesOnStartup(true); - answer.addConnector(bindAddress); + answer.addConnector("tcp://localhost:0"); MemoryUsage memoryUsage = new MemoryUsage(); memoryUsage.setLimit(MEMORY_USAGE); diff --git a/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java index b11686ae43..682bba93c5 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java @@ -54,6 +54,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { public boolean useCache = true; public boolean dispatchAsync = true; public boolean prioritizeMessages = true; + public boolean immediatePriorityDispatch = true; public int prefetchVal = 500; public int MSG_NUM = 600; @@ -73,7 +74,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { policy.setUseCache(useCache); StorePendingDurableSubscriberMessageStoragePolicy durableSubPending = new StorePendingDurableSubscriberMessageStoragePolicy(); - durableSubPending.setImmediatePriorityDispatch(true); + durableSubPending.setImmediatePriorityDispatch(immediatePriorityDispatch); policy.setPendingDurableSubscriberPolicy(durableSubPending); PolicyMap policyMap = new PolicyMap(); policyMap.put(new ActiveMQQueue("TEST"), policy); diff --git a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java index 1327bb38d1..15f13451de 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java @@ -22,13 +22,18 @@ import java.util.HashMap; import java.util.Vector; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; +import javax.jms.DeliveryMode; import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.TextMessage; import javax.jms.TopicSubscriber; import junit.framework.Test; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.MessagePriorityTest; import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.util.Wait; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.derby.jdbc.EmbeddedDataSource; @@ -36,7 +41,7 @@ import org.apache.derby.jdbc.EmbeddedDataSource; public class JDBCMessagePriorityTest extends MessagePriorityTest { private static final Log LOG = LogFactory.getLog(JDBCMessagePriorityTest.class); - + @Override protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception { JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); @@ -81,9 +86,9 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest { sub = sess.createDurableSubscriber(topic, subName); for (int i = 0; i < MSG_NUM * 4; i++) { Message msg = sub.receive(10000); - LOG.debug("received i=" + i + ", m=" + (msg!=null? + LOG.debug("received i=" + i + ", m=" + (msg != null ? msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority() - : null) ); + : null)); assertNotNull("Message " + i + " was null", msg); assertEquals("Message " + i + " has wrong priority", priorities[i / MSG_NUM], msg.getJMSPriority()); if (i > 0 && i % closeFrequency == 0) { @@ -97,7 +102,7 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest { } public void initCombosForTestConcurrentDurableSubsReconnectWithXLevels() { - addCombinationValues("prioritizeMessages", new Object[] {Boolean.TRUE, Boolean.FALSE}); + addCombinationValues("prioritizeMessages", new Object[]{Boolean.TRUE, Boolean.FALSE}); } public void testConcurrentDurableSubsReconnectWithXLevels() throws Exception { @@ -115,7 +120,7 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest { final AtomicInteger[] messageCounts = new AtomicInteger[maxPriority]; Vector producers = new Vector(); - for (int priority=0; priority 0 && i % closeFrequency == 0) { @@ -151,6 +156,105 @@ public class JDBCMessagePriorityTest extends MessagePriorityTest { } } + public void initCombosForTestConcurrentRate() { + addCombinationValues("prefetchVal", new Object[]{new Integer(1), new Integer(500)}); + } + + public void testConcurrentRate() throws Exception { + ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST"); + final String subName = "priorityConcurrent"; + Connection consumerConn = factory.createConnection(); + consumerConn.setClientID("subName"); + consumerConn.start(); + Session consumerSession = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber sub = consumerSession.createDurableSubscriber(topic, subName); + sub.close(); + + final int TO_SEND = 2000; + final Vector duplicates = new Vector(); + final int[] dups = new int[TO_SEND * 4]; + long start; + double max = 0, sum = 0; + MessageProducer messageProducer = sess.createProducer(topic); + TextMessage message = sess.createTextMessage(); + for (int i = 0; i < TO_SEND; i++) { + int priority = i % 10; + message.setText(i + "-" + priority); + message.setIntProperty("seq", i); + message.setJMSPriority(priority); + if (i > 0 && i % 1000 == 0) { + LOG.info("Max send time: " + max + ". Sending message: " + message.getText()); + } + start = System.currentTimeMillis(); + messageProducer.send(message, DeliveryMode.PERSISTENT, message.getJMSPriority(), 0); + long duration = System.currentTimeMillis() - start; + max = Math.max(max, duration); + if (duration == max) { + LOG.info("new max: " + max + " on i=" + i + ", " + message.getText()); + } + sum += duration; + } + + LOG.info("Sent: " + TO_SEND + ", max send time: " + max); + + double noConsumerAve = (sum * 100 / TO_SEND); + sub = consumerSession.createDurableSubscriber(topic, subName); + final AtomicInteger count = new AtomicInteger(); + sub.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + try { + count.incrementAndGet(); + if (count.get() % 100 == 0) { + LOG.info("onMessage: count: " + count.get() + ", " + ((TextMessage) message).getText() + ", seqNo " + message.getIntProperty("seq") + ", " + message.getJMSMessageID()); + } + int seqNo = message.getIntProperty("seq"); + if (dups[seqNo] == 0) { + dups[seqNo] = 1; + } else { + LOG.error("Duplicate: " + ((TextMessage) message).getText() + ", " + message.getJMSMessageID()); + duplicates.add(message); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + LOG.info("Activated consumer"); + sum = max = 0; + for (int i = TO_SEND; i < (TO_SEND * 2); i++) { + int priority = i % 10; + message.setText(i + "-" + priority); + message.setIntProperty("seq", i); + message.setJMSPriority(priority); + if (i > 0 && i % 1000 == 0) { + LOG.info("Max send time: " + max + ". Sending message: " + message.getText()); + } + start = System.currentTimeMillis(); + messageProducer.send(message, DeliveryMode.PERSISTENT, message.getJMSPriority(), 0); + long duration = System.currentTimeMillis() - start; + max = Math.max(max, duration); + if (duration == max) { + LOG.info("new max: " + max + " on i=" + i + ", " + message.getText()); + } + sum += duration; + } + LOG.info("Sent another: " + TO_SEND + ", max send time: " + max); + + double withConsumerAve = (sum * 100 / TO_SEND); + assertTrue("max three times as slow with consumer:" + withConsumerAve + " , noConsumerMax:" + noConsumerAve, + withConsumerAve < noConsumerAve * 3); + Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + LOG.info("count: " + count.get()); + return TO_SEND * 2 == count.get(); + } + }, 60 * 1000); + + assertTrue("No duplicates : " + duplicates, duplicates.isEmpty()); + assertEquals("got all messages", TO_SEND * 2, count.get()); + } + public static Test suite() { return suite(JDBCMessagePriorityTest.class); } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java index 64153c8eef..97f7f5b702 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java @@ -50,7 +50,8 @@ public class JdbcDurableSubDupTest { private static final Log LOG = LogFactory.getLog(JdbcDurableSubDupTest.class); final int prefetchVal = 150; - String url = "tcp://localhost:61616?jms.watchTopicAdvisories=false"; + String urlOptions = "jms.watchTopicAdvisories=false"; + String url = null; String queueName = "topicTest?consumer.prefetchSize=" + prefetchVal; String xmlMessage = ""; @@ -83,10 +84,11 @@ public class JdbcDurableSubDupTest { policyMap.setDefaultEntry(policyEntry); broker.setDestinationPolicy(policyMap); - broker.addConnector("tcp://localhost:61616"); + broker.addConnector("tcp://localhost:0"); broker.setDeleteAllMessagesOnStartup(true); broker.start(); broker.waitUntilStarted(); + url = broker.getTransportConnectors().get(0).getConnectUri().toString() + "?" + urlOptions; } @After