From 9216c18c73e3bf5d755dff963750f08106423b56 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Wed, 18 Feb 2009 09:44:31 +0000 Subject: [PATCH] little refactor of recovery dispatch as now only used for browser dispatch git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@745456 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Queue.java | 105 ++++++++-------- .../broker/region/QueueDispatchSelector.java | 7 +- .../activemq/broker/region/TempQueue.java | 22 ---- .../apache/activemq/broker/BrokerTest.java | 118 ++++++++++++++++++ 4 files changed, 175 insertions(+), 77 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 57731afb3b..fa3e246782 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -18,6 +18,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; @@ -214,12 +215,37 @@ public class Queue extends BaseDestination implements Task { } } - class RecoveryDispatch { - public ArrayList messages; - public Subscription subscription; + /* + * Holder for subscription and pagedInMessages as a browser + * needs access to existing messages in the queue that have + * already been dispatched + */ + class BrowserDispatch { + ArrayList messages; + QueueBrowserSubscription browser; + + public BrowserDispatch(QueueBrowserSubscription browserSubscription, + Collection values) { + + messages = new ArrayList(values); + browser = browserSubscription; + browser.incrementQueueRef(); + } + + void done() { + try { + browser.decrementQueueRef(); + } catch (Exception e) { + LOG.warn("decrement ref on browser: " + browser, e); + } + } + + public QueueBrowserSubscription getBrowser() { + return browser; + } } - LinkedList recoveries = new LinkedList(); + LinkedList browserDispatches = new LinkedList(); public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { // synchronize with dispatch method so that no new messages are sent @@ -257,19 +283,18 @@ public class Queue extends BaseDestination implements Task { } } - // do recovery dispatch only if it is a browser subscription - if(sub instanceof QueueBrowserSubscription ) { - // any newly paged in messages that are not dispatched are added to pagedInPending in iterate() - doPageIn(false); + if (sub instanceof QueueBrowserSubscription ) { + QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub; + + // do again in iterate to ensure new messages are dispatched + doPageIn(false); synchronized (pagedInMessages) { - RecoveryDispatch rd = new RecoveryDispatch(); - rd.messages = new ArrayList(pagedInMessages.values()); - rd.subscription = sub; - recoveries.addLast(rd); + if (!pagedInMessages.isEmpty()) { + BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription, pagedInMessages.values()); + browserDispatches.addLast(browserDispatch); + } } - - ((QueueBrowserSubscription)sub).incrementQueueRef(); } if (!(this.optimizedDispatch || isSlave())) { wakeup(); @@ -971,64 +996,45 @@ public class Queue extends BaseDestination implements Task { return movedCounter; } - RecoveryDispatch getNextRecoveryDispatch() { + BrowserDispatch getNextBrowserDispatch() { synchronized (pagedInMessages) { - if( recoveries.isEmpty() ) { + if( browserDispatches.isEmpty() ) { return null; } - return recoveries.removeFirst(); + return browserDispatches.removeFirst(); } } - protected boolean isRecoveryDispatchEmpty() { - synchronized (pagedInMessages) { - return recoveries.isEmpty(); - } - } /** * @return true if we would like to iterate again * @see org.apache.activemq.thread.Task#iterate() */ public boolean iterate() { + boolean pageInMoreMessages = false; synchronized(iteratingMutex) { - RecoveryDispatch rd; - while ((rd = getNextRecoveryDispatch()) != null) { + BrowserDispatch rd; + while ((rd = getNextBrowserDispatch()) != null) { + pageInMoreMessages = true; + try { MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); msgContext.setDestination(destination); + QueueBrowserSubscription browser = rd.getBrowser(); for (QueueMessageReference node : rd.messages) { - if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) { + if (!node.isAcked()) { msgContext.setMessageReference(node); - if (rd.subscription.matches(node, msgContext)) { - // Log showing message dispatching - if (LOG.isDebugEnabled()) { - LOG.debug(destination.getQualifiedName() + " - Recovery - Message pushed '" + node.hashCode() + " - " + node + "' to subscription: '" + rd.subscription + "'"); - } - rd.subscription.add(node); - } else { - // make sure it gets queued for dispatched again - dispatchLock.lock(); - try { - synchronized(pagedInPendingDispatch) { - if (!pagedInPendingDispatch.contains(node)) { - pagedInPendingDispatch.add(node); - } - } - } finally { - dispatchLock.unlock(); - } + if (browser.matches(node, msgContext)) { + browser.add(node); } } } - - if( rd.subscription instanceof QueueBrowserSubscription ) { - ((QueueBrowserSubscription)rd.subscription).decrementQueueRef(); - } - + + rd.done(); + } catch (Exception e) { - e.printStackTrace(); + LOG.warn("exception on dispatch to browser: " + rd.getBrowser(), e); } } @@ -1061,7 +1067,6 @@ public class Queue extends BaseDestination implements Task { } } - boolean pageInMoreMessages = false; synchronized (messages) { pageInMoreMessages = !messages.isEmpty(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java index daae0c7e6f..09c64272a7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java @@ -62,12 +62,9 @@ public class QueueDispatchSelector extends SimpleDispatchSelector { public boolean canSelect(Subscription subscription, MessageReference m) throws Exception { - if (subscription.isBrowser() && super.canDispatch(subscription, m)) { - return true; - } - boolean result = super.canDispatch(subscription, m) ; - if (result) { + boolean result = super.canDispatch(subscription, m); + if (result && !subscription.isBrowser()) { result = exclusiveConsumer == null || exclusiveConsumer == subscription; if (result) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java index c3c0ae01a6..aeadd4b5a9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java @@ -72,27 +72,5 @@ public class TempQueue extends Queue{ LOG.debug(" changed ownership of " + this + " to "+ tempDest.getConnectionId()); } super.addSubscription(context, sub); - } - - public void xwakeup() { - boolean result = false; - synchronized (messages) { - result = !messages.isEmpty(); - } - if (result) { - try { - pageInMessages(false); - - } catch (Throwable e) { - LOG.error("Failed to page in more queue messages ", e); - } - } - if (!messagesWaitingForSpace.isEmpty() || !isRecoveryDispatchEmpty()) { - try { - taskRunner.wakeup(); - } catch (InterruptedException e) { - LOG.warn("Task Runner failed to wakeup ", e); - } - } } } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java index 7a4573f223..332b5526c7 100755 --- a/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java @@ -161,6 +161,124 @@ public class BrokerTest extends BrokerTestSupport { assertNoMessagesLeft(connection2); } + + /* + * change the order of the above test + */ + public void testQueueBrowserWith2ConsumersBrowseFirst() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + deliveryMode = DeliveryMode.NON_PERSISTENT; + + + // Setup a second connection with a queue browser. + StubConnection connection2 = createConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); + consumerInfo2.setPrefetchSize(10); + consumerInfo2.setBrowser(true); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + connection2.request(consumerInfo2); + + // Setup a first connection + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ProducerInfo producerInfo = createProducerInfo(sessionInfo1); + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.send(producerInfo); + + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); + consumerInfo1.setPrefetchSize(10); + connection1.request(consumerInfo1); + + // Send the messages + connection1.send(createMessage(producerInfo, destination, deliveryMode)); + connection1.send(createMessage(producerInfo, destination, deliveryMode)); + connection1.send(createMessage(producerInfo, destination, deliveryMode)); + //as the messages are sent async - need to synchronize the last + //one to ensure they arrive in the order we want + connection1.request(createMessage(producerInfo, destination, deliveryMode)); + + + List messages = new ArrayList(); + + for (int i = 0; i < 4; i++) { + Message m1 = receiveMessage(connection1); + assertNotNull("m1 is null for index: " + i, m1); + messages.add(m1); + } + + // no messages present in queue browser as there were no messages when it + // was created + assertNoMessagesLeft(connection1); + assertNoMessagesLeft(connection2); + } + + public void testQueueBrowserWith2ConsumersInterleaved() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + deliveryMode = DeliveryMode.NON_PERSISTENT; + + // Setup a first connection + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ProducerInfo producerInfo = createProducerInfo(sessionInfo1); + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.send(producerInfo); + + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); + consumerInfo1.setPrefetchSize(10); + connection1.request(consumerInfo1); + + // Send the messages + connection1.request(createMessage(producerInfo, destination, deliveryMode)); + + // Setup a second connection with a queue browser. + StubConnection connection2 = createConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); + consumerInfo2.setPrefetchSize(1); + consumerInfo2.setBrowser(true); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + connection2.request(consumerInfo2); + + + connection1.send(createMessage(producerInfo, destination, deliveryMode)); + connection1.send(createMessage(producerInfo, destination, deliveryMode)); + //as the messages are sent async - need to synchronize the last + //one to ensure they arrive in the order we want + connection1.request(createMessage(producerInfo, destination, deliveryMode)); + + + List messages = new ArrayList(); + + for (int i = 0; i < 4; i++) { + Message m1 = receiveMessage(connection1); + assertNotNull("m1 is null for index: " + i, m1); + messages.add(m1); + } + + for (int i = 0; i < 1; i++) { + Message m1 = messages.get(i); + Message m2 = receiveMessage(connection2); + assertNotNull("m2 is null for index: " + i, m2); + assertEquals(m1.getMessageId(), m2.getMessageId()); + connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.DELIVERED_ACK_TYPE)); + } + + assertNoMessagesLeft(connection1); + assertNoMessagesLeft(connection2); + } + + public void initCombosForTestConsumerPrefetchAndStandardAck() { addCombinationValues("deliveryMode", new Object[] { // Integer.valueOf(DeliveryMode.NON_PERSISTENT),