From 156372ab40bbf2ccb1367059c44b2f4f69130e7c Mon Sep 17 00:00:00 2001 From: Stanislav Knot Date: Mon, 22 Jan 2018 13:33:30 +0100 Subject: [PATCH] ARTEMIS-1625 fix moving messages --- .../management/impl/QueueControlImpl.java | 4 +- .../artemis/core/postoffice/PostOffice.java | 9 ++- .../core/postoffice/impl/PostOfficeImpl.java | 23 +++++-- .../activemq/artemis/core/server/Queue.java | 10 +-- .../artemis/core/server/impl/QueueImpl.java | 25 ++++---- .../impl/ScheduledDeliveryHandlerTest.java | 13 ++-- .../management/QueueControlTest.java | 64 +++++++++++++++++++ .../tests/integration/paging/PagingTest.java | 2 +- .../unit/core/postoffice/impl/FakeQueue.java | 14 ++-- .../server/impl/fakes/FakePostOffice.java | 10 +-- 10 files changed, 125 insertions(+), 49 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 9f051fd99e..0a1c3850c8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -701,7 +701,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName); } - return queue.moveReference(messageID, binding.getAddress(), rejectDuplicates); + return queue.moveReference(messageID, binding.getAddress(), binding, rejectDuplicates); } finally { blockOnIO(); } @@ -730,7 +730,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName); } - int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates); + int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, binding); return retValue; } finally { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index 9eee907f72..d015eda446 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -111,6 +111,12 @@ public interface PostOffice extends ActiveMQComponent { boolean direct, boolean rejectDuplicates) throws Exception; + RoutingStatus route(Message message, + Transaction tx, + boolean direct, + boolean rejectDuplicates, + Binding binding) throws Exception; + RoutingStatus route(Message message, RoutingContext context, boolean direct) throws Exception; @@ -118,7 +124,8 @@ public interface PostOffice extends ActiveMQComponent { RoutingStatus route(Message message, RoutingContext context, boolean direct, - boolean rejectDuplicates) throws Exception; + boolean rejectDuplicates, + Binding binding) throws Exception; MessageReference reroute(Message message, Queue queue, Transaction tx) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index d69a782b1b..13ef9019f8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -716,26 +716,36 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return route(message, new RoutingContextImpl(tx), direct); } + @Override + public RoutingStatus route(Message message, + Transaction tx, + boolean direct, + boolean rejectDuplicates) throws Exception { + return route(message, new RoutingContextImpl(tx), direct, rejectDuplicates, null); + } + @Override public RoutingStatus route(final Message message, final Transaction tx, final boolean direct, - final boolean rejectDuplicates) throws Exception { - return route(message, new RoutingContextImpl(tx), direct, rejectDuplicates); + final boolean rejectDuplicates, + final Binding binding) throws Exception { + return route(message, new RoutingContextImpl(tx), direct, rejectDuplicates, binding); } @Override public RoutingStatus route(final Message message, final RoutingContext context, final boolean direct) throws Exception { - return route(message, context, direct, true); + return route(message, context, direct, true, null); } @Override public RoutingStatus route(final Message message, final RoutingContext context, final boolean direct, - boolean rejectDuplicates) throws Exception { + boolean rejectDuplicates, + final Binding bindingMove) throws Exception { RoutingStatus result = RoutingStatus.OK; // Sanity check @@ -769,8 +779,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding // bindings = addressManager.getBindingsForRoutingAddress(address); // } } - - if (bindings != null) { + if (bindingMove != null) { + bindingMove.route(message, context); + } else if (bindings != null) { bindings.route(message, context); } else { // this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 844a49dc8c..d4ec406c02 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.ReferenceCounter; @@ -183,16 +184,15 @@ public interface Queue extends Bindable,CriticalComponent { int changeReferencesPriority(Filter filter, byte newPriority) throws Exception; - boolean moveReference(long messageID, SimpleString toAddress) throws Exception; + boolean moveReference(long messageID, SimpleString toAddress, Binding binding, boolean rejectDuplicates) throws Exception; - boolean moveReference(long messageID, SimpleString toAddress, boolean rejectDuplicates) throws Exception; - - int moveReferences(Filter filter, SimpleString toAddress) throws Exception; + int moveReferences(Filter filter, SimpleString toAddress, Binding binding) throws Exception; int moveReferences(int flushLimit, Filter filter, SimpleString toAddress, - boolean rejectDuplicates) throws Exception; + boolean rejectDuplicates, + Binding binding) throws Exception; int retryMessages(Filter filter) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index cb94aed5be..f6b6e000e9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1259,7 +1259,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (logger.isTraceEnabled()) { logger.trace("moving expired reference " + ref + " to address = " + messageExpiryAddress + " from queue=" + this.getName()); } - move(null, messageExpiryAddress, ref, false, AckReason.EXPIRED); + move(null, messageExpiryAddress, null, ref, false, AckReason.EXPIRED); } else { if (logger.isTraceEnabled()) { logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName()); @@ -1750,14 +1750,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - @Override - public boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception { - return moveReference(messageID, toAddress, false); - } - @Override public synchronized boolean moveReference(final long messageID, final SimpleString toAddress, + final Binding binding, final boolean rejectDuplicate) throws Exception { try (LinkedListIterator iter = iterator()) { while (iter.hasNext()) { @@ -1767,7 +1763,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { refRemoved(ref); incDelivering(); try { - move(null, toAddress, ref, rejectDuplicate, AckReason.NORMAL); + move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL); } catch (Exception e) { decDelivering(); throw e; @@ -1780,15 +1776,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } @Override - public int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception { - return moveReferences(DEFAULT_FLUSH_LIMIT, filter, toAddress, false); + public int moveReferences(final Filter filter, final SimpleString toAddress, Binding binding) throws Exception { + return moveReferences(DEFAULT_FLUSH_LIMIT, filter, toAddress, false, binding); } @Override public synchronized int moveReferences(final int flushLimit, final Filter filter, final SimpleString toAddress, - final boolean rejectDuplicates) throws Exception { + final boolean rejectDuplicates, + final Binding binding) throws Exception { final DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress); return iterQueue(flushLimit, filter, new QueueIterateAction() { @@ -1810,7 +1807,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } if (!ignored) { - move(toAddress, tx, ref, false, rejectDuplicates); + move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL); + //move(toAddress, tx, ref, false, rejectDuplicates); } } }); @@ -2648,7 +2646,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { ref.acknowledge(tx, AckReason.KILLED); } else { ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name); - move(tx, deadLetterAddress, ref, false, AckReason.KILLED); + move(tx, deadLetterAddress,null, ref, false, AckReason.KILLED); } } else { ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name); @@ -2659,6 +2657,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private void move(final Transaction originalTX, final SimpleString address, + final Binding binding, final MessageReference ref, final boolean rejectDuplicate, final AckReason reason) throws Exception { @@ -2675,7 +2674,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { copyMessage.setAddress(address); - postOffice.route(copyMessage, tx, false, rejectDuplicate); + postOffice.route(copyMessage, tx, false, rejectDuplicate, binding); acknowledge(tx, ref, reason); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 5cfac12bf0..a492efddc3 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.Persister; +import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; @@ -1135,17 +1136,12 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public boolean moveReference(long messageID, SimpleString toAddress) throws Exception { + public boolean moveReference(long messageID, SimpleString toAddress, Binding binding, boolean rejectDuplicates) throws Exception { return false; } @Override - public boolean moveReference(long messageID, SimpleString toAddress, boolean rejectDuplicates) throws Exception { - return false; - } - - @Override - public int moveReferences(Filter filter, SimpleString toAddress) throws Exception { + public int moveReferences(Filter filter, SimpleString toAddress, Binding binding) throws Exception { return 0; } @@ -1153,7 +1149,8 @@ public class ScheduledDeliveryHandlerTest extends Assert { public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress, - boolean rejectDuplicates) throws Exception { + boolean rejectDuplicates, + Binding binding) throws Exception { return 0; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 83d2d1cdcc..f9adc2fdb2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -994,6 +994,70 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(otherQueue); } + @Test + public void testMoveMessages2() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queueA = new SimpleString("A"); + SimpleString queueB = new SimpleString("B"); + SimpleString queueC = new SimpleString("C"); + + server.createQueue(address, RoutingType.MULTICAST, queueA, null, true, false); + server.createQueue(address, RoutingType.MULTICAST, queueB, null, true, false); + server.createQueue(address, RoutingType.MULTICAST, queueC, null, true, false); + + + QueueControl queueControlA = createManagementControl(address, queueA); + QueueControl queueControlB = createManagementControl(address, queueB); + QueueControl queueControlC = createManagementControl(address, queueC); + + // send two messages on queueA + + queueControlA.sendMessage(new HashMap(), Message.BYTES_TYPE, Base64.encodeBytes("theBody".getBytes()), true, "myUser", "myPassword"); + queueControlA.sendMessage(new HashMap(), Message.BYTES_TYPE, Base64.encodeBytes("theBody2".getBytes()), true, "myUser", "myPassword"); + + Assert.assertEquals(2, getMessageCount(queueControlA)); + Assert.assertEquals(0, getMessageCount(queueControlB)); + Assert.assertEquals(0, getMessageCount(queueControlC)); + + // move 2 messages from queueA to queueB + queueControlA.moveMessages(null, queueB.toString()); + Thread.sleep(500); + Assert.assertEquals(0, getMessageCount(queueControlA)); + Assert.assertEquals(2, getMessageCount(queueControlB)); + + // move 1 message to queueC + queueControlA.sendMessage(new HashMap(), Message.BYTES_TYPE, Base64.encodeBytes("theBody3".getBytes()), true, "myUser", "myPassword"); + Assert.assertEquals(1, getMessageCount(queueControlA)); + queueControlA.moveMessages(null, queueC.toString()); + Assert.assertEquals(1, getMessageCount(queueControlC)); + Assert.assertEquals(0, getMessageCount(queueControlA)); + + //move all messages back to A + queueControlB.moveMessages(null, queueA.toString()); + Assert.assertEquals(2, getMessageCount(queueControlA)); + Assert.assertEquals(0, getMessageCount(queueControlB)); + + queueControlC.moveMessages(null, queueA.toString()); + Assert.assertEquals(3, getMessageCount(queueControlA)); + Assert.assertEquals(0, getMessageCount(queueControlC)); + + // consume the message from queueA + ClientConsumer consumer = session.createConsumer(queueA); + ClientMessage m1 = consumer.receive(500); + ClientMessage m2 = consumer.receive(500); + ClientMessage m3 = consumer.receive(500); + + m1.acknowledge(); + m2.acknowledge(); + m3.acknowledge(); + + consumer.close(); + session.deleteQueue(queueA); + session.deleteQueue(queueB); + session.deleteQueue(queueC); + + } + @Test public void testMoveMessagesToUnknownQueue() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 542d94d99f..fc4818293c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -5632,7 +5632,7 @@ public class PagingTest extends ActiveMQTestBase { Queue queue = server.locateQueue(new SimpleString("Q1")); - queue.moveReferences(10, (Filter) null, new SimpleString("Q2"), false); + queue.moveReferences(10, (Filter) null, new SimpleString("Q2"), false, server.getPostOffice().getBinding(new SimpleString("Q2"))); waitForNotPaging(store); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index f654ed5782..7b378797f8 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; @@ -480,13 +481,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override - public boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception { - // no-op - return false; - } - - @Override - public int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception { + public int moveReferences(final Filter filter, final SimpleString toAddress, Binding binding) throws Exception { // no-op return 0; } @@ -595,7 +590,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override - public boolean moveReference(long messageID, SimpleString toAddress, boolean rejectDuplicates) throws Exception { + public boolean moveReference(long messageID, SimpleString toAddress, Binding binding, boolean rejectDuplicates) throws Exception { // no-op return false; } @@ -614,7 +609,8 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress, - boolean rejectDuplicates) throws Exception { + boolean rejectDuplicates, + Binding binding) throws Exception { return 0; } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index f9e413a0b9..1674df5a0a 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -202,16 +202,18 @@ public class FakePostOffice implements PostOffice { return RoutingStatus.OK; } + @Override + public RoutingStatus route(Message message, Transaction tx, boolean direct, boolean rejectDuplicates, Binding binding) throws Exception { + return null; + } + @Override public RoutingStatus route(Message message, RoutingContext context, boolean direct) throws Exception { return null; } @Override - public RoutingStatus route(Message message, - RoutingContext context, - boolean direct, - boolean rejectDuplicates) throws Exception { + public RoutingStatus route(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, Binding binding) throws Exception { return null; }