little refactor of recovery dispatch as now only used for browser dispatch

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@745456 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-02-18 09:44:31 +00:00
parent 9ad6c089fa
commit 9216c18c73
4 changed files with 175 additions and 77 deletions

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashSet; import java.util.HashSet;
@ -214,12 +215,37 @@ public class Queue extends BaseDestination implements Task {
} }
} }
class RecoveryDispatch { /*
public ArrayList<QueueMessageReference> messages; * Holder for subscription and pagedInMessages as a browser
public Subscription subscription; * needs access to existing messages in the queue that have
* already been dispatched
*/
class BrowserDispatch {
ArrayList<QueueMessageReference> messages;
QueueBrowserSubscription browser;
public BrowserDispatch(QueueBrowserSubscription browserSubscription,
Collection<QueueMessageReference> values) {
messages = new ArrayList<QueueMessageReference>(values);
browser = browserSubscription;
browser.incrementQueueRef();
} }
LinkedList<RecoveryDispatch> recoveries = new LinkedList<RecoveryDispatch>(); void done() {
try {
browser.decrementQueueRef();
} catch (Exception e) {
LOG.warn("decrement ref on browser: " + browser, e);
}
}
public QueueBrowserSubscription getBrowser() {
return browser;
}
}
LinkedList<BrowserDispatch> browserDispatches = new LinkedList<BrowserDispatch>();
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// synchronize with dispatch method so that no new messages are sent // synchronize with dispatch method so that no new messages are sent
@ -257,19 +283,18 @@ public class Queue extends BaseDestination implements Task {
} }
} }
// do recovery dispatch only if it is a browser subscription
if (sub instanceof QueueBrowserSubscription ) { if (sub instanceof QueueBrowserSubscription ) {
// any newly paged in messages that are not dispatched are added to pagedInPending in iterate() QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
// do again in iterate to ensure new messages are dispatched
doPageIn(false); doPageIn(false);
synchronized (pagedInMessages) { synchronized (pagedInMessages) {
RecoveryDispatch rd = new RecoveryDispatch(); if (!pagedInMessages.isEmpty()) {
rd.messages = new ArrayList<QueueMessageReference>(pagedInMessages.values()); BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription, pagedInMessages.values());
rd.subscription = sub; browserDispatches.addLast(browserDispatch);
recoveries.addLast(rd); }
} }
((QueueBrowserSubscription)sub).incrementQueueRef();
} }
if (!(this.optimizedDispatch || isSlave())) { if (!(this.optimizedDispatch || isSlave())) {
wakeup(); wakeup();
@ -971,64 +996,45 @@ public class Queue extends BaseDestination implements Task {
return movedCounter; return movedCounter;
} }
RecoveryDispatch getNextRecoveryDispatch() { BrowserDispatch getNextBrowserDispatch() {
synchronized (pagedInMessages) { synchronized (pagedInMessages) {
if( recoveries.isEmpty() ) { if( browserDispatches.isEmpty() ) {
return null; return null;
} }
return recoveries.removeFirst(); return browserDispatches.removeFirst();
} }
} }
protected boolean isRecoveryDispatchEmpty() {
synchronized (pagedInMessages) {
return recoveries.isEmpty();
}
}
/** /**
* @return true if we would like to iterate again * @return true if we would like to iterate again
* @see org.apache.activemq.thread.Task#iterate() * @see org.apache.activemq.thread.Task#iterate()
*/ */
public boolean iterate() { public boolean iterate() {
boolean pageInMoreMessages = false;
synchronized(iteratingMutex) { synchronized(iteratingMutex) {
RecoveryDispatch rd; BrowserDispatch rd;
while ((rd = getNextRecoveryDispatch()) != null) { while ((rd = getNextBrowserDispatch()) != null) {
pageInMoreMessages = true;
try { try {
MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(destination); msgContext.setDestination(destination);
QueueBrowserSubscription browser = rd.getBrowser();
for (QueueMessageReference node : rd.messages) { for (QueueMessageReference node : rd.messages) {
if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) { if (!node.isAcked()) {
msgContext.setMessageReference(node); msgContext.setMessageReference(node);
if (rd.subscription.matches(node, msgContext)) { if (browser.matches(node, msgContext)) {
// Log showing message dispatching browser.add(node);
if (LOG.isDebugEnabled()) {
LOG.debug(destination.getQualifiedName() + " - Recovery - Message pushed '" + node.hashCode() + " - " + node + "' to subscription: '" + rd.subscription + "'");
}
rd.subscription.add(node);
} else {
// make sure it gets queued for dispatched again
dispatchLock.lock();
try {
synchronized(pagedInPendingDispatch) {
if (!pagedInPendingDispatch.contains(node)) {
pagedInPendingDispatch.add(node);
}
}
} finally {
dispatchLock.unlock();
}
} }
} }
} }
if( rd.subscription instanceof QueueBrowserSubscription ) { rd.done();
((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); LOG.warn("exception on dispatch to browser: " + rd.getBrowser(), e);
} }
} }
@ -1061,7 +1067,6 @@ public class Queue extends BaseDestination implements Task {
} }
} }
boolean pageInMoreMessages = false;
synchronized (messages) { synchronized (messages) {
pageInMoreMessages = !messages.isEmpty(); pageInMoreMessages = !messages.isEmpty();
} }

View File

@ -62,12 +62,9 @@ public class QueueDispatchSelector extends SimpleDispatchSelector {
public boolean canSelect(Subscription subscription, public boolean canSelect(Subscription subscription,
MessageReference m) throws Exception { MessageReference m) throws Exception {
if (subscription.isBrowser() && super.canDispatch(subscription, m)) {
return true;
}
boolean result = super.canDispatch(subscription, m); boolean result = super.canDispatch(subscription, m);
if (result) { if (result && !subscription.isBrowser()) {
result = exclusiveConsumer == null result = exclusiveConsumer == null
|| exclusiveConsumer == subscription; || exclusiveConsumer == subscription;
if (result) { if (result) {

View File

@ -73,26 +73,4 @@ public class TempQueue extends Queue{
} }
super.addSubscription(context, sub); super.addSubscription(context, sub);
} }
public void xwakeup() {
boolean result = false;
synchronized (messages) {
result = !messages.isEmpty();
}
if (result) {
try {
pageInMessages(false);
} catch (Throwable e) {
LOG.error("Failed to page in more queue messages ", e);
}
}
if (!messagesWaitingForSpace.isEmpty() || !isRecoveryDispatchEmpty()) {
try {
taskRunner.wakeup();
} catch (InterruptedException e) {
LOG.warn("Task Runner failed to wakeup ", e);
}
}
}
} }

View File

@ -161,6 +161,124 @@ public class BrokerTest extends BrokerTestSupport {
assertNoMessagesLeft(connection2); assertNoMessagesLeft(connection2);
} }
/*
* change the order of the above test
*/
public void testQueueBrowserWith2ConsumersBrowseFirst() throws Exception {
ActiveMQDestination destination = new ActiveMQQueue("TEST");
deliveryMode = DeliveryMode.NON_PERSISTENT;
// Setup a second connection with a queue browser.
StubConnection connection2 = createConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
consumerInfo2.setPrefetchSize(10);
consumerInfo2.setBrowser(true);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
connection2.request(consumerInfo2);
// Setup a first connection
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.send(producerInfo);
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
consumerInfo1.setPrefetchSize(10);
connection1.request(consumerInfo1);
// Send the messages
connection1.send(createMessage(producerInfo, destination, deliveryMode));
connection1.send(createMessage(producerInfo, destination, deliveryMode));
connection1.send(createMessage(producerInfo, destination, deliveryMode));
//as the messages are sent async - need to synchronize the last
//one to ensure they arrive in the order we want
connection1.request(createMessage(producerInfo, destination, deliveryMode));
List<Message> messages = new ArrayList<Message>();
for (int i = 0; i < 4; i++) {
Message m1 = receiveMessage(connection1);
assertNotNull("m1 is null for index: " + i, m1);
messages.add(m1);
}
// no messages present in queue browser as there were no messages when it
// was created
assertNoMessagesLeft(connection1);
assertNoMessagesLeft(connection2);
}
public void testQueueBrowserWith2ConsumersInterleaved() throws Exception {
ActiveMQDestination destination = new ActiveMQQueue("TEST");
deliveryMode = DeliveryMode.NON_PERSISTENT;
// Setup a first connection
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.send(producerInfo);
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
consumerInfo1.setPrefetchSize(10);
connection1.request(consumerInfo1);
// Send the messages
connection1.request(createMessage(producerInfo, destination, deliveryMode));
// Setup a second connection with a queue browser.
StubConnection connection2 = createConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
consumerInfo2.setPrefetchSize(1);
consumerInfo2.setBrowser(true);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
connection2.request(consumerInfo2);
connection1.send(createMessage(producerInfo, destination, deliveryMode));
connection1.send(createMessage(producerInfo, destination, deliveryMode));
//as the messages are sent async - need to synchronize the last
//one to ensure they arrive in the order we want
connection1.request(createMessage(producerInfo, destination, deliveryMode));
List<Message> messages = new ArrayList<Message>();
for (int i = 0; i < 4; i++) {
Message m1 = receiveMessage(connection1);
assertNotNull("m1 is null for index: " + i, m1);
messages.add(m1);
}
for (int i = 0; i < 1; i++) {
Message m1 = messages.get(i);
Message m2 = receiveMessage(connection2);
assertNotNull("m2 is null for index: " + i, m2);
assertEquals(m1.getMessageId(), m2.getMessageId());
connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.DELIVERED_ACK_TYPE));
}
assertNoMessagesLeft(connection1);
assertNoMessagesLeft(connection2);
}
public void initCombosForTestConsumerPrefetchAndStandardAck() { public void initCombosForTestConsumerPrefetchAndStandardAck() {
addCombinationValues("deliveryMode", new Object[] { addCombinationValues("deliveryMode", new Object[] {
// Integer.valueOf(DeliveryMode.NON_PERSISTENT), // Integer.valueOf(DeliveryMode.NON_PERSISTENT),