diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 39b2a957f3..2efd82ddbc 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -87,7 +87,6 @@ public class Queue implements Destination, Task { private int garbageSizeBeforeCollection = 1000; private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private final MessageStore store; - private int highestSubscriptionPriority = Integer.MIN_VALUE; private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); private int maximumPagedInMessages = garbageSizeBeforeCollection * 2; @@ -179,20 +178,12 @@ public class Queue implements Destination, Task { */ public boolean lock(MessageReference node,LockOwner lockOwner){ synchronized(exclusiveLockMutex){ - if(exclusiveOwner==lockOwner){ - return true; - } - if(exclusiveOwner!=null){ - return false; - } - if(lockOwner.getLockPriority() exclusiveOwner.getLockPriority()) { + exclusiveOwner = owner; + } + } + } + } // page in messages doPageIn(); // synchronize with dispatch method so that no new messages are sent // while - // setting up a subscription. avoid out of order messages, duplicates + // setting up a subscription. avoid out of order messages, + // duplicates // etc. dispatchValve.turnOff(); - if (sub.getConsumerInfo().getPriority() > highestSubscriptionPriority) { - highestSubscriptionPriority = sub.getConsumerInfo().getPriority(); - } msgContext.setDestination(destination); synchronized(pagedInMessages){ // Add all the matching messages in the queue to the @@ -261,14 +255,29 @@ public class Queue implements Destination, Task { try { synchronized (consumers) { - consumers.remove(sub); - if (consumers.isEmpty()) { - messages.gc(); - } - } - sub.remove(context, this); + consumers.remove(sub); + if (sub.getConsumerInfo().isExclusive()) { + LockOwner owner = (LockOwner) sub; + // Did we loose the exclusive owner?? + if (exclusiveOwner == owner) { - highestSubscriptionPriority = calcHighestSubscriptionPriority(); + // Find the exclusive consumer with the higest Lock + // Priority. + exclusiveOwner = null; + for (Iterator iter = consumers.iterator(); iter.hasNext();) { + Subscription s = (Subscription) iter.next(); + LockOwner so = (LockOwner) s; + if (s.getConsumerInfo().isExclusive() && (exclusiveOwner == null || so.getLockPriority() > exclusiveOwner.getLockPriority())) + exclusiveOwner = so; + } + } + } + if (consumers.isEmpty()) { + messages.gc(); + } + + } + sub.remove(context, this); boolean wasExclusiveOwner = false; if (exclusiveOwner == sub) { @@ -629,20 +638,6 @@ public class Queue implements Destination, Task { return result; } - - private int calcHighestSubscriptionPriority() { - int rc = Integer.MIN_VALUE; - synchronized (consumers) { - for (Iterator iter = consumers.iterator(); iter.hasNext();) { - Subscription sub = (Subscription) iter.next(); - if (sub.getConsumerInfo().getPriority() > rc) { - rc = sub.getConsumerInfo().getPriority(); - } - } - } - return rc; - } - public MessageStore getMessageStore() { return store; } diff --git a/activemq-core/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java new file mode 100644 index 0000000000..8bb6e63c04 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java @@ -0,0 +1,348 @@ +/** +* +* Copyright 2005-2006 The Apache Software Foundation +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQQueue; + +public class ExclusiveConsumerTest extends TestCase { + + private static final String VM_BROKER_URL = "vm://localhost?broker.persistent=false&broker.useJmx=true"; + + public ExclusiveConsumerTest(String name) { + super(name); + } + + protected void setUp() throws Exception { + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + } + + private Connection createConnection(final boolean start) throws JMSException { + ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL); + Connection conn = cf.createConnection(); + if (start) { + conn.start(); + } + return conn; + } + + public void testExclusiveConsumerSelectedCreatedFirst() throws JMSException, InterruptedException { + Connection conn = createConnection(true); + + Session exclusiveSession = null; + Session fallbackSession = null; + Session senderSession = null; + + try { + + exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE1?consumer.exclusive=true"); + MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue); + + ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE1"); + MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue); + + + ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE1"); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + producer.send(msg); + //TODO need two send a 2nd message - bug AMQ-1024 + //producer.send(msg); + Thread.sleep(100); + + //Verify exclusive consumer receives the message. + Assert.assertNotNull(exclusiveConsumer.receive(100)); + Assert.assertNull(fallbackConsumer.receive(100)); + + } finally { + fallbackSession.close(); + senderSession.close(); + conn.close(); + } + + } + + public void testExclusiveConsumerSelectedCreatedAfter() throws JMSException, InterruptedException { + Connection conn = createConnection(true); + + Session exclusiveSession = null; + Session fallbackSession = null; + Session senderSession = null; + + try { + + exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE5"); + MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue); + + ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE5?consumer.exclusive=true"); + MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue); + + ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE5"); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + producer.send(msg); + Thread.sleep(100); + + //Verify exclusive consumer receives the message. + Assert.assertNotNull(exclusiveConsumer.receive(100)); + Assert.assertNull(fallbackConsumer.receive(100)); + + } finally { + fallbackSession.close(); + senderSession.close(); + conn.close(); + } + + } + + public void testFailoverToAnotherExclusiveConsumerCreatedFirst() throws JMSException, InterruptedException { + Connection conn = createConnection(true); + + Session exclusiveSession1 = null; + Session exclusiveSession2 = null; + Session fallbackSession = null; + Session senderSession = null; + + try { + + exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // This creates the exclusive consumer first which avoids AMQ-1024 bug. + ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE2?consumer.exclusive=true"); + MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue); + MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue); + + ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE2"); + MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue); + + ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE2"); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + producer.send(msg); + Thread.sleep(100); + + //Verify exclusive consumer receives the message. + Assert.assertNotNull(exclusiveConsumer1.receive(100)); + Assert.assertNull(exclusiveConsumer2.receive(100)); + Assert.assertNull(fallbackConsumer.receive(100)); + + // Close the exclusive consumer to verify the non-exclusive consumer takes over + exclusiveConsumer1.close(); + + producer.send(msg); + producer.send(msg); + + Assert.assertNotNull(exclusiveConsumer2.receive(100)); + Assert.assertNull(fallbackConsumer.receive(100)); + + + } finally { + fallbackSession.close(); + senderSession.close(); + conn.close(); + } + + } + + public void testFailoverToAnotherExclusiveConsumerCreatedAfter() throws JMSException, InterruptedException { + Connection conn = createConnection(true); + + Session exclusiveSession1 = null; + Session exclusiveSession2 = null; + Session fallbackSession = null; + Session senderSession = null; + + try { + + exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // This creates the exclusive consumer first which avoids AMQ-1024 bug. + ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE6?consumer.exclusive=true"); + MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue); + + ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE6"); + MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue); + + MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue); + + ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE6"); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + producer.send(msg); + Thread.sleep(100); + + //Verify exclusive consumer receives the message. + Assert.assertNotNull(exclusiveConsumer1.receive(100)); + Assert.assertNull(exclusiveConsumer2.receive(100)); + Assert.assertNull(fallbackConsumer.receive(100)); + + // Close the exclusive consumer to verify the non-exclusive consumer takes over + exclusiveConsumer1.close(); + + producer.send(msg); + producer.send(msg); + + Assert.assertNotNull(exclusiveConsumer2.receive(100)); + Assert.assertNull(fallbackConsumer.receive(100)); + + + } finally { + fallbackSession.close(); + senderSession.close(); + conn.close(); + } + + } + public void testFailoverToNonExclusiveConsumer() throws JMSException, InterruptedException { + Connection conn = createConnection(true); + + Session exclusiveSession = null; + Session fallbackSession = null; + Session senderSession = null; + + try { + + exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // This creates the exclusive consumer first which avoids AMQ-1024 bug. + ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE3?consumer.exclusive=true"); + MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue); + + ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE3"); + MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue); + + ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE3"); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + producer.send(msg); + Thread.sleep(100); + + //Verify exclusive consumer receives the message. + Assert.assertNotNull(exclusiveConsumer.receive(100)); + Assert.assertNull(fallbackConsumer.receive(100)); + + // Close the exclusive consumer to verify the non-exclusive consumer takes over + exclusiveConsumer.close(); + + producer.send(msg); + + Assert.assertNotNull(fallbackConsumer.receive(100)); + + } finally { + fallbackSession.close(); + senderSession.close(); + conn.close(); + } + + } + + public void testFallbackToExclusiveConsumer() throws JMSException, InterruptedException { + Connection conn = createConnection(true); + + Session exclusiveSession = null; + Session fallbackSession = null; + Session senderSession = null; + + try { + + exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // This creates the exclusive consumer first which avoids AMQ-1024 bug. + ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE4?consumer.exclusive=true"); + MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue); + + ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE4"); + MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue); + + ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE4"); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + producer.send(msg); + Thread.sleep(100); + + //Verify exclusive consumer receives the message. + Assert.assertNotNull(exclusiveConsumer.receive(100)); + Assert.assertNull(fallbackConsumer.receive(100)); + + // Close the exclusive consumer to verify the non-exclusive consumer takes over + exclusiveConsumer.close(); + + producer.send(msg); + + // Verify other non-exclusive consumer receices the message. + Assert.assertNotNull(fallbackConsumer.receive(100)); + + // Create exclusive consumer to determine if it will start receiving the messages. + exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue); + + producer.send(msg); + Assert.assertNotNull(exclusiveConsumer.receive(100)); + Assert.assertNull(fallbackConsumer.receive(100)); + + } finally { + fallbackSession.close(); + senderSession.close(); + conn.close(); + } + + } +}