mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-6286 extend strictOrderDispatch to retain order of redispatched messages for a single consumer
This commit is contained in:
parent
6cf8bed0c5
commit
f47b370573
|
@ -556,7 +556,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
|||
}
|
||||
}
|
||||
|
||||
for (MessageReference ref : unAckedMessages) {
|
||||
for (Iterator<MessageReference> unackedListIterator = unAckedMessages.iterator(); unackedListIterator.hasNext(); ) {
|
||||
MessageReference ref = unackedListIterator.next();
|
||||
// AMQ-5107: don't resend if the broker is shutting down
|
||||
if ( this.brokerService.isStopping() ) {
|
||||
break;
|
||||
|
@ -578,10 +579,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
|||
}
|
||||
}
|
||||
}
|
||||
if (!qmr.isDropped()) {
|
||||
dispatchPendingList.addMessageForRedelivery(qmr);
|
||||
if (qmr.isDropped()) {
|
||||
unackedListIterator.remove();
|
||||
}
|
||||
}
|
||||
dispatchPendingList.addForRedelivery(unAckedMessages, strictOrderDispatch && consumers.isEmpty());
|
||||
if (sub instanceof QueueBrowserSubscription) {
|
||||
((QueueBrowserSubscription)sub).decrementQueueRef();
|
||||
browserDispatches.remove(sub);
|
||||
|
|
|
@ -198,4 +198,32 @@ public class OrderedPendingList implements PendingList {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insertAtHead(List<MessageReference> list) {
|
||||
if (list != null && !list.isEmpty()) {
|
||||
PendingNode newHead = null;
|
||||
PendingNode appendNode = null;
|
||||
for (MessageReference ref : list) {
|
||||
PendingNode node = new PendingNode(this, ref);
|
||||
pendingMessageHelper.addToMap(ref, node);
|
||||
if (newHead == null) {
|
||||
newHead = node;
|
||||
appendNode = node;
|
||||
continue;
|
||||
}
|
||||
appendNode.linkAfter(node);
|
||||
appendNode = node;
|
||||
}
|
||||
// insert this new list at root
|
||||
if (root == null) {
|
||||
root = newHead;
|
||||
tail = appendNode;
|
||||
} else {
|
||||
appendNode.linkAfter(root);
|
||||
root = newHead;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.cursors;
|
|||
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.broker.region.MessageReference;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
|
@ -114,4 +115,6 @@ public interface PendingList extends Iterable<MessageReference> {
|
|||
public void addAll(PendingList pendingList);
|
||||
|
||||
public MessageReference get(MessageId messageId);
|
||||
|
||||
public void insertAtHead(List<MessageReference> list);
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Collection;
|
|||
import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.broker.region.MessageReference;
|
||||
|
@ -205,4 +206,14 @@ public class PrioritizedPendingList implements PendingList {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insertAtHead(List<MessageReference> list) {
|
||||
// behave like addAll - pure order within priority lists is not required
|
||||
if (list != null) {
|
||||
for (MessageReference ref: list) {
|
||||
addMessageLast(ref);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -193,6 +193,11 @@ public class QueueDispatchPendingList implements PendingList {
|
|||
return rc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insertAtHead(List<MessageReference> list) {
|
||||
throw new IllegalStateException("no insertion support in: " + this.getClass().getCanonicalName());
|
||||
}
|
||||
|
||||
public void setPrioritizedMessages(boolean prioritizedMessages) {
|
||||
prioritized = prioritizedMessages;
|
||||
if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
|
||||
|
@ -204,11 +209,19 @@ public class QueueDispatchPendingList implements PendingList {
|
|||
}
|
||||
}
|
||||
|
||||
public void addMessageForRedelivery(QueueMessageReference qmr) {
|
||||
redeliveredWaitingDispatch.addMessageLast(qmr);
|
||||
}
|
||||
|
||||
public boolean hasRedeliveries(){
|
||||
return !redeliveredWaitingDispatch.isEmpty();
|
||||
}
|
||||
|
||||
public void addForRedelivery(List<MessageReference> list, boolean noConsumers) {
|
||||
if (noConsumers) {
|
||||
// a single consumer can expect repeatable redelivery order irrespective
|
||||
// of transaction or prefetch boundaries
|
||||
redeliveredWaitingDispatch.insertAtHead(list);
|
||||
} else {
|
||||
for (MessageReference ref : list) {
|
||||
redeliveredWaitingDispatch.addMessageLast(ref);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,9 +20,12 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.MessageReference;
|
||||
|
@ -268,6 +271,53 @@ public class OrderPendingListTest {
|
|||
list.addAll(null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertAtHead() throws Exception {
|
||||
OrderedPendingList underTest = new OrderedPendingList();
|
||||
|
||||
TestPendingList source = new TestPendingList();
|
||||
source.addMessageLast(new TestMessageReference(1));
|
||||
source.addMessageLast(new TestMessageReference(2));
|
||||
source.addMessageLast(new TestMessageReference(3));
|
||||
source.addMessageLast(new TestMessageReference(4));
|
||||
source.addMessageLast(new TestMessageReference(5));
|
||||
|
||||
assertTrue(underTest.isEmpty());
|
||||
assertEquals(5, source.size());
|
||||
|
||||
LinkedList linkedList = new LinkedList();
|
||||
linkedList.addAll(source.values());
|
||||
underTest.insertAtHead(linkedList);
|
||||
assertEquals(5, underTest.size());
|
||||
|
||||
underTest.insertAtHead(null);
|
||||
|
||||
linkedList.clear();
|
||||
|
||||
Iterator<MessageReference> iterator = underTest.iterator();
|
||||
for (int i=0; i < 2 && iterator.hasNext(); i++ ) {
|
||||
MessageReference ref = iterator.next();
|
||||
linkedList.addLast(ref);
|
||||
iterator.remove();
|
||||
assertEquals(ref.getMessageId().getProducerSequenceId(), i + 1);
|
||||
}
|
||||
|
||||
assertEquals(3, underTest.size());
|
||||
|
||||
underTest.insertAtHead(linkedList);
|
||||
assertEquals(5, underTest.size());
|
||||
|
||||
iterator = underTest.iterator();
|
||||
for (int i=0; iterator.hasNext(); i++ ) {
|
||||
MessageReference ref = iterator.next();
|
||||
linkedList.addLast(ref);
|
||||
iterator.remove();
|
||||
assertEquals(ref.getMessageId().getProducerSequenceId(), i + 1);
|
||||
}
|
||||
assertEquals(0, underTest.size());
|
||||
|
||||
}
|
||||
|
||||
static class TestPendingList implements PendingList {
|
||||
|
||||
private final LinkedList<MessageReference> theList = new LinkedList<MessageReference>();
|
||||
|
@ -349,6 +399,12 @@ public class OrderPendingListTest {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insertAtHead(List<MessageReference> list) {
|
||||
theList.addAll(list);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
static class TestMessageReference implements MessageReference {
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.usecases;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class QueueOrderSingleTransactedConsumerTest {
|
||||
|
||||
BrokerService broker = null;
|
||||
ActiveMQQueue dest = new ActiveMQQueue("Queue");
|
||||
|
||||
@Test
|
||||
public void testSingleConsumerTxRepeat() throws Exception {
|
||||
|
||||
publishMessages(100);
|
||||
|
||||
consumeVerifyOrderAndRollback(20);
|
||||
consumeVerifyOrderAndRollback(10);
|
||||
consumeVerifyOrderAndRollback(5);
|
||||
}
|
||||
|
||||
private void consumeVerifyOrderAndRollback(final int num) throws Exception {
|
||||
Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
|
||||
connection.start();
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
MessageConsumer messageConsumer = session.createConsumer(dest);
|
||||
for (int i=0; i<num; ) {
|
||||
Message message = messageConsumer.receive(4000);
|
||||
if (message != null) {
|
||||
assertEquals(i, message.getIntProperty("Order"));
|
||||
i++;
|
||||
}
|
||||
}
|
||||
session.rollback();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
private void publishMessages(int num) throws Exception {
|
||||
Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
|
||||
connection.start();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer messageProducer = session.createProducer(dest);
|
||||
TextMessage textMessage = session.createTextMessage("A");
|
||||
for (int i=0; i<num; i++) {
|
||||
textMessage.setIntProperty("Order", i);
|
||||
messageProducer.send(textMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void startBroker() throws Exception {
|
||||
broker = new BrokerService();
|
||||
broker.setDeleteAllMessagesOnStartup(true);
|
||||
|
||||
// add the policy entries
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
|
||||
PolicyEntry pe = new PolicyEntry();
|
||||
pe.setExpireMessagesPeriod(0);
|
||||
|
||||
pe.setQueuePrefetch(0); // make incremental dispatch to the consumers explicit
|
||||
pe.setStrictOrderDispatch(true); // force redeliveries back to the head of the queue
|
||||
|
||||
pe.setQueue(">");
|
||||
entries.add(pe);
|
||||
policyMap.setPolicyEntries(entries);
|
||||
broker.setDestinationPolicy(policyMap);
|
||||
|
||||
broker.addConnector("tcp://0.0.0.0:0");
|
||||
broker.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopBroker() throws Exception {
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue