diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index b0418dda75..0b1001f9bb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -55,16 +55,7 @@ import org.apache.activemq.broker.region.group.MessageGroupMap; import org.apache.activemq.broker.region.group.MessageGroupMapFactory; import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ExceptionResponse; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatchNotification; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.ProducerAck; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.Response; +import org.apache.activemq.command.*; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; @@ -1796,10 +1787,12 @@ public class Queue extends BaseDestination implements Task, UsageListener { if (dispatchSelector.canSelect(s, node)) { if (!fullConsumers.contains(s)) { if (!s.isFull()) { - // Dispatch it. - s.add(node); - target = s; - break; + if (assignMessageGroup(s, (QueueMessageReference)node)) { + // Dispatch it. + s.add(node); + target = s; + break; + } } else { // no further dispatch of list to a full consumer to // avoid out of order message receipt @@ -1841,6 +1834,60 @@ public class Queue extends BaseDestination implements Task, UsageListener { return rc; } + protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception { + //QueueMessageReference node = (QueueMessageReference) m; + boolean result = true; + // Keep message groups together. + String groupId = node.getGroupID(); + int sequence = node.getGroupSequence(); + if (groupId != null) { + //MessageGroupMap messageGroupOwners = ((Queue) node + // .getRegionDestination()).getMessageGroupOwners(); + + MessageGroupMap messageGroupOwners = getMessageGroupOwners(); + // If we can own the first, then no-one else should own the + // rest. + if (sequence == 1) { + assignGroup(subscription, messageGroupOwners, node, groupId); + } else { + + // Make sure that the previous owner is still valid, we may + // need to become the new owner. + ConsumerId groupOwner; + + groupOwner = messageGroupOwners.get(groupId); + if (groupOwner == null) { + assignGroup(subscription, messageGroupOwners, node, groupId); + } else { + if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) { + // A group sequence < 1 is an end of group signal. + if (sequence < 0) { + messageGroupOwners.removeGroup(groupId); + } + } else { + result = false; + } + } + } + } + + return result; + + } + + protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException { + messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId()); + Message message = n.getMessage(); + if (message instanceof ActiveMQMessage) { + ActiveMQMessage activeMessage = (ActiveMQMessage) message; + try { + activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false); + } catch (JMSException e) { + LOG.warn("Failed to set boolean header: " + e, e); + } + } + } + protected void pageInMessages(boolean force) throws Exception { doDispatch(doPageIn(force)); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java index 09c64272a7..481c0f206a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java @@ -65,61 +65,9 @@ public class QueueDispatchSelector extends SimpleDispatchSelector { boolean result = super.canDispatch(subscription, m); if (result && !subscription.isBrowser()) { - result = exclusiveConsumer == null - || exclusiveConsumer == subscription; - if (result) { - QueueMessageReference node = (QueueMessageReference) m; - // Keep message groups together. - String groupId = node.getGroupID(); - int sequence = node.getGroupSequence(); - if (groupId != null) { - MessageGroupMap messageGroupOwners = ((Queue) node - .getRegionDestination()).getMessageGroupOwners(); - - // If we can own the first, then no-one else should own the - // rest. - if (sequence == 1) { - assignGroup(subscription, messageGroupOwners, node,groupId); - }else { - - // Make sure that the previous owner is still valid, we may - // need to become the new owner. - ConsumerId groupOwner; - - groupOwner = messageGroupOwners.get(groupId); - if (groupOwner == null) { - assignGroup(subscription, messageGroupOwners, node,groupId); - } else { - if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) { - // A group sequence < 1 is an end of group signal. - if (sequence < 0) { - messageGroupOwners.removeGroup(groupId); - } - } else { - result = false; - } - } - } - } - } + result = exclusiveConsumer == null || exclusiveConsumer == subscription; } return result; } - protected void assignGroup(Subscription subs,MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException { - messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId()); - Message message = n.getMessage(); - if (message instanceof ActiveMQMessage) { - ActiveMQMessage activeMessage = (ActiveMQMessage)message; - try { - activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false); - } catch (JMSException e) { - LOG.warn("Failed to set boolean header: " + e, e); - } - } - } - - - - } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java new file mode 100644 index 0000000000..3c0fd8e386 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.*; +import javax.jms.Queue; +import java.util.*; +import java.util.concurrent.CountDownLatch; + +/* + * Test plan: + * Producer: publish messages into a queue, with 10 message groups, closing the group with seq=-1 on message 5 and message 10 + * Consumers: 2 consumers created after all messages are sent + * + * Expected: for each group, messages 1-5 are handled by one consumer and messages 6-10 are handled by the other consumer. Messages + * 1 and 6 have the JMSXGroupFirstForConsumer property set to true. + */ +public class MessageGroupCloseTest extends TestCase { + private static final Log LOG = LogFactory.getLog(MessageGroupNewConsumerTest.class); + private Connection connection; + // Released after all messages are created + private CountDownLatch latchMessagesCreated = new CountDownLatch(1); + + private int messagesSent, messagesRecvd1, messagesRecvd2, messageGroupCount, errorCountFirstForConsumer, errorCountWrongConsumerClose, errorCountDuplicateClose; + // groupID, count + private HashMap messageGroups1 = new HashMap(); + private HashMap messageGroups2 = new HashMap(); + private HashSet closedGroups1 = new HashSet(); + private HashSet closedGroups2 = new HashSet(); + // with the prefetch too high, this bug is not realized + private static final String connStr = + //"tcp://localhost:61616"; + "vm://localhost?broker.persistent=false&broker.useJmx=false&jms.prefetchPolicy.all=1"; + + public void testNewConsumer() throws JMSException, InterruptedException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connStr); + connection = factory.createConnection(); + connection.start(); + final String queueName = this.getClass().getSimpleName(); + final Thread producerThread = new Thread() { + public void run() { + try { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(queueName); + MessageProducer prod = session.createProducer(queue); + for (int i=0; i<10; i++) { + for (int j=0; j<10; j++) { + int seq = j + 1; + if ((j+1) % 5 == 0) { + seq = -1; + } + Message message = generateMessage(session, Integer.toString(i), seq); + prod.send(message); + session.commit(); + messagesSent++; + LOG.info("Sent message: group=" + i + ", seq="+ seq); + //Thread.sleep(20); + } + if (i % 100 == 0) { + LOG.info("Sent messages: group=" + i); + } + messageGroupCount++; + } + LOG.info(messagesSent+" messages sent"); + latchMessagesCreated.countDown(); + prod.close(); + session.close(); + } catch (Exception e) { + LOG.error("Producer failed", e); + } + } + }; + final Thread consumerThread1 = new Thread() { + public void run() { + try { + latchMessagesCreated.await(); + LOG.info("starting consumer1"); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(queueName); + MessageConsumer con1 = session.createConsumer(queue); + while(true) { + Message message = con1.receive(5000); + if (message == null) break; + LOG.info("Con1: got message "+formatMessage(message)); + checkMessage(message, "Con1", messageGroups1, closedGroups1); + session.commit(); + messagesRecvd1++; + if (messagesRecvd1 % 100 == 0) { + LOG.info("Con1: got messages count=" + messagesRecvd1); + } + //Thread.sleep(50); + } + LOG.info("Con1: total messages=" + messagesRecvd1); + LOG.info("Con1: total message groups=" + messageGroups1.size()); + con1.close(); + session.close(); + } catch (Exception e) { + LOG.error("Consumer 1 failed", e); + } + } + }; + final Thread consumerThread2 = new Thread() { + public void run() { + try { + latchMessagesCreated.await(); + LOG.info("starting consumer2"); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(queueName); + MessageConsumer con2 = session.createConsumer(queue); + while(true) { + Message message = con2.receive(5000); + if (message == null) { break; } + LOG.info("Con2: got message "+formatMessage(message)); + checkMessage(message, "Con2", messageGroups2, closedGroups2); + session.commit(); + messagesRecvd2++; + if (messagesRecvd2 % 100 == 0) { + LOG.info("Con2: got messages count=" + messagesRecvd2); + } + //Thread.sleep(50); + } + con2.close(); + session.close(); + LOG.info("Con2: total messages=" + messagesRecvd2); + LOG.info("Con2: total message groups=" + messageGroups2.size()); + } catch (Exception e) { + LOG.error("Consumer 2 failed", e); + } + } + }; + consumerThread2.start(); + consumerThread1.start(); + producerThread.start(); + // wait for threads to finish + producerThread.join(); + consumerThread1.join(); + consumerThread2.join(); + connection.close(); + // check results + + assertEquals("consumers should get all the messages", messagesSent, messagesRecvd1 + messagesRecvd2); + assertEquals("not all message groups closed for consumer 1", messageGroups1.size(), closedGroups1.size()); + assertEquals("not all message groups closed for consumer 2", messageGroups2.size(), closedGroups2.size()); + assertTrue("producer failed to send any messages", messagesSent > 0); + assertEquals("JMSXGroupFirstForConsumer not set", 0, errorCountFirstForConsumer); + assertEquals("wrong consumer got close message", 0, errorCountWrongConsumerClose); + assertEquals("consumer got duplicate close message", 0, errorCountDuplicateClose); + } + + public Message generateMessage(Session session, String groupId, int seq) throws JMSException { + TextMessage m = session.createTextMessage(); + m.setJMSType("TEST_MESSAGE"); + m.setStringProperty("JMSXGroupID", groupId); + m.setIntProperty("JMSXGroupSeq", seq); + m.setText(""); + return m; + } + public String formatMessage(Message m) { + try { + return "group="+m.getStringProperty("JMSXGroupID")+", seq="+m.getIntProperty("JMSXGroupSeq"); + } catch (Exception e) { + return e.getClass().getSimpleName()+": "+e.getMessage(); + } + } + public void checkMessage(Message m, String consumerId, Map messageGroups, Set closedGroups) throws JMSException { + String groupId = m.getStringProperty("JMSXGroupID"); + int seq = m.getIntProperty("JMSXGroupSeq"); + Integer count = messageGroups.get(groupId); + if (count == null) { + // first time seeing this group + if (!m.propertyExists("JMSXGroupFirstForConsumer") || + !m.getBooleanProperty("JMSXGroupFirstForConsumer")) { + LOG.info(consumerId + ": JMSXGroupFirstForConsumer not set for group=" + groupId + ", seq=" +seq); + errorCountFirstForConsumer++; + } + if (seq == -1) { + closedGroups.add(groupId); + LOG.info(consumerId + ": wrong consumer got close message for group=" + groupId); + errorCountWrongConsumerClose++; + } + messageGroups.put(groupId, 1); + } else { + // existing group + if (closedGroups.contains(groupId)) { + // group reassigned to same consumer + closedGroups.remove(groupId); + if (!m.propertyExists("JMSXGroupFirstForConsumer") || + !m.getBooleanProperty("JMSXGroupFirstForConsumer")) { + LOG.info(consumerId + ": JMSXGroupFirstForConsumer not set for group=" + groupId + ", seq=" +seq); + errorCountFirstForConsumer++; + } + if (seq == -1) { + LOG.info(consumerId + ": consumer got duplicate close message for group=" + groupId); + errorCountDuplicateClose++; + } + } + if (seq == -1) { + closedGroups.add(groupId); + } + messageGroups.put(groupId, count + 1); + } + } +} \ No newline at end of file