ARTEMIS-1625 fix moving messages

This commit is contained in:
Stanislav Knot 2018-01-22 13:33:30 +01:00 committed by Clebert Suconic
parent b740181929
commit 156372ab40
10 changed files with 125 additions and 49 deletions

View File

@ -701,7 +701,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
}
return queue.moveReference(messageID, binding.getAddress(), rejectDuplicates);
return queue.moveReference(messageID, binding.getAddress(), binding, rejectDuplicates);
} finally {
blockOnIO();
}
@ -730,7 +730,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
}
int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates);
int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, binding);
return retValue;
} finally {

View File

@ -111,6 +111,12 @@ public interface PostOffice extends ActiveMQComponent {
boolean direct,
boolean rejectDuplicates) throws Exception;
RoutingStatus route(Message message,
Transaction tx,
boolean direct,
boolean rejectDuplicates,
Binding binding) throws Exception;
RoutingStatus route(Message message,
RoutingContext context,
boolean direct) throws Exception;
@ -118,7 +124,8 @@ public interface PostOffice extends ActiveMQComponent {
RoutingStatus route(Message message,
RoutingContext context,
boolean direct,
boolean rejectDuplicates) throws Exception;
boolean rejectDuplicates,
Binding binding) throws Exception;
MessageReference reroute(Message message, Queue queue, Transaction tx) throws Exception;

View File

@ -716,26 +716,36 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return route(message, new RoutingContextImpl(tx), direct);
}
@Override
public RoutingStatus route(Message message,
Transaction tx,
boolean direct,
boolean rejectDuplicates) throws Exception {
return route(message, new RoutingContextImpl(tx), direct, rejectDuplicates, null);
}
@Override
public RoutingStatus route(final Message message,
final Transaction tx,
final boolean direct,
final boolean rejectDuplicates) throws Exception {
return route(message, new RoutingContextImpl(tx), direct, rejectDuplicates);
final boolean rejectDuplicates,
final Binding binding) throws Exception {
return route(message, new RoutingContextImpl(tx), direct, rejectDuplicates, binding);
}
@Override
public RoutingStatus route(final Message message,
final RoutingContext context,
final boolean direct) throws Exception {
return route(message, context, direct, true);
return route(message, context, direct, true, null);
}
@Override
public RoutingStatus route(final Message message,
final RoutingContext context,
final boolean direct,
boolean rejectDuplicates) throws Exception {
boolean rejectDuplicates,
final Binding bindingMove) throws Exception {
RoutingStatus result = RoutingStatus.OK;
// Sanity check
@ -769,8 +779,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
// bindings = addressManager.getBindingsForRoutingAddress(address);
// }
}
if (bindings != null) {
if (bindingMove != null) {
bindingMove.route(message, context);
} else if (bindings != null) {
bindings.route(message, context);
} else {
// this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS)

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ReferenceCounter;
@ -183,16 +184,15 @@ public interface Queue extends Bindable,CriticalComponent {
int changeReferencesPriority(Filter filter, byte newPriority) throws Exception;
boolean moveReference(long messageID, SimpleString toAddress) throws Exception;
boolean moveReference(long messageID, SimpleString toAddress, Binding binding, boolean rejectDuplicates) throws Exception;
boolean moveReference(long messageID, SimpleString toAddress, boolean rejectDuplicates) throws Exception;
int moveReferences(Filter filter, SimpleString toAddress) throws Exception;
int moveReferences(Filter filter, SimpleString toAddress, Binding binding) throws Exception;
int moveReferences(int flushLimit,
Filter filter,
SimpleString toAddress,
boolean rejectDuplicates) throws Exception;
boolean rejectDuplicates,
Binding binding) throws Exception;
int retryMessages(Filter filter) throws Exception;

View File

@ -1259,7 +1259,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (logger.isTraceEnabled()) {
logger.trace("moving expired reference " + ref + " to address = " + messageExpiryAddress + " from queue=" + this.getName());
}
move(null, messageExpiryAddress, ref, false, AckReason.EXPIRED);
move(null, messageExpiryAddress, null, ref, false, AckReason.EXPIRED);
} else {
if (logger.isTraceEnabled()) {
logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName());
@ -1750,14 +1750,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
@Override
public boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception {
return moveReference(messageID, toAddress, false);
}
@Override
public synchronized boolean moveReference(final long messageID,
final SimpleString toAddress,
final Binding binding,
final boolean rejectDuplicate) throws Exception {
try (LinkedListIterator<MessageReference> iter = iterator()) {
while (iter.hasNext()) {
@ -1767,7 +1763,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
refRemoved(ref);
incDelivering();
try {
move(null, toAddress, ref, rejectDuplicate, AckReason.NORMAL);
move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL);
} catch (Exception e) {
decDelivering();
throw e;
@ -1780,15 +1776,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
public int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception {
return moveReferences(DEFAULT_FLUSH_LIMIT, filter, toAddress, false);
public int moveReferences(final Filter filter, final SimpleString toAddress, Binding binding) throws Exception {
return moveReferences(DEFAULT_FLUSH_LIMIT, filter, toAddress, false, binding);
}
@Override
public synchronized int moveReferences(final int flushLimit,
final Filter filter,
final SimpleString toAddress,
final boolean rejectDuplicates) throws Exception {
final boolean rejectDuplicates,
final Binding binding) throws Exception {
final DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress);
return iterQueue(flushLimit, filter, new QueueIterateAction() {
@ -1810,7 +1807,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (!ignored) {
move(toAddress, tx, ref, false, rejectDuplicates);
move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL);
//move(toAddress, tx, ref, false, rejectDuplicates);
}
}
});
@ -2648,7 +2646,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
ref.acknowledge(tx, AckReason.KILLED);
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
move(tx, deadLetterAddress, ref, false, AckReason.KILLED);
move(tx, deadLetterAddress,null, ref, false, AckReason.KILLED);
}
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);
@ -2659,6 +2657,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private void move(final Transaction originalTX,
final SimpleString address,
final Binding binding,
final MessageReference ref,
final boolean rejectDuplicate,
final AckReason reason) throws Exception {
@ -2675,7 +2674,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
copyMessage.setAddress(address);
postOffice.route(copyMessage, tx, false, rejectDuplicate);
postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);
acknowledge(tx, ref, reason);

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@ -1135,17 +1136,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public boolean moveReference(long messageID, SimpleString toAddress) throws Exception {
public boolean moveReference(long messageID, SimpleString toAddress, Binding binding, boolean rejectDuplicates) throws Exception {
return false;
}
@Override
public boolean moveReference(long messageID, SimpleString toAddress, boolean rejectDuplicates) throws Exception {
return false;
}
@Override
public int moveReferences(Filter filter, SimpleString toAddress) throws Exception {
public int moveReferences(Filter filter, SimpleString toAddress, Binding binding) throws Exception {
return 0;
}
@ -1153,7 +1149,8 @@ public class ScheduledDeliveryHandlerTest extends Assert {
public int moveReferences(int flushLimit,
Filter filter,
SimpleString toAddress,
boolean rejectDuplicates) throws Exception {
boolean rejectDuplicates,
Binding binding) throws Exception {
return 0;
}

View File

@ -994,6 +994,70 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(otherQueue);
}
@Test
public void testMoveMessages2() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queueA = new SimpleString("A");
SimpleString queueB = new SimpleString("B");
SimpleString queueC = new SimpleString("C");
server.createQueue(address, RoutingType.MULTICAST, queueA, null, true, false);
server.createQueue(address, RoutingType.MULTICAST, queueB, null, true, false);
server.createQueue(address, RoutingType.MULTICAST, queueC, null, true, false);
QueueControl queueControlA = createManagementControl(address, queueA);
QueueControl queueControlB = createManagementControl(address, queueB);
QueueControl queueControlC = createManagementControl(address, queueC);
// send two messages on queueA
queueControlA.sendMessage(new HashMap<String, String>(), Message.BYTES_TYPE, Base64.encodeBytes("theBody".getBytes()), true, "myUser", "myPassword");
queueControlA.sendMessage(new HashMap<String, String>(), Message.BYTES_TYPE, Base64.encodeBytes("theBody2".getBytes()), true, "myUser", "myPassword");
Assert.assertEquals(2, getMessageCount(queueControlA));
Assert.assertEquals(0, getMessageCount(queueControlB));
Assert.assertEquals(0, getMessageCount(queueControlC));
// move 2 messages from queueA to queueB
queueControlA.moveMessages(null, queueB.toString());
Thread.sleep(500);
Assert.assertEquals(0, getMessageCount(queueControlA));
Assert.assertEquals(2, getMessageCount(queueControlB));
// move 1 message to queueC
queueControlA.sendMessage(new HashMap<String, String>(), Message.BYTES_TYPE, Base64.encodeBytes("theBody3".getBytes()), true, "myUser", "myPassword");
Assert.assertEquals(1, getMessageCount(queueControlA));
queueControlA.moveMessages(null, queueC.toString());
Assert.assertEquals(1, getMessageCount(queueControlC));
Assert.assertEquals(0, getMessageCount(queueControlA));
//move all messages back to A
queueControlB.moveMessages(null, queueA.toString());
Assert.assertEquals(2, getMessageCount(queueControlA));
Assert.assertEquals(0, getMessageCount(queueControlB));
queueControlC.moveMessages(null, queueA.toString());
Assert.assertEquals(3, getMessageCount(queueControlA));
Assert.assertEquals(0, getMessageCount(queueControlC));
// consume the message from queueA
ClientConsumer consumer = session.createConsumer(queueA);
ClientMessage m1 = consumer.receive(500);
ClientMessage m2 = consumer.receive(500);
ClientMessage m3 = consumer.receive(500);
m1.acknowledge();
m2.acknowledge();
m3.acknowledge();
consumer.close();
session.deleteQueue(queueA);
session.deleteQueue(queueB);
session.deleteQueue(queueC);
}
@Test
public void testMoveMessagesToUnknownQueue() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();

View File

@ -5632,7 +5632,7 @@ public class PagingTest extends ActiveMQTestBase {
Queue queue = server.locateQueue(new SimpleString("Q1"));
queue.moveReferences(10, (Filter) null, new SimpleString("Q2"), false);
queue.moveReferences(10, (Filter) null, new SimpleString("Q2"), false, server.getPostOffice().getBinding(new SimpleString("Q2")));
waitForNotPaging(store);

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@ -480,13 +481,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception {
// no-op
return false;
}
@Override
public int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception {
public int moveReferences(final Filter filter, final SimpleString toAddress, Binding binding) throws Exception {
// no-op
return 0;
}
@ -595,7 +590,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public boolean moveReference(long messageID, SimpleString toAddress, boolean rejectDuplicates) throws Exception {
public boolean moveReference(long messageID, SimpleString toAddress, Binding binding, boolean rejectDuplicates) throws Exception {
// no-op
return false;
}
@ -614,7 +609,8 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
public int moveReferences(int flushLimit,
Filter filter,
SimpleString toAddress,
boolean rejectDuplicates) throws Exception {
boolean rejectDuplicates,
Binding binding) throws Exception {
return 0;
}

View File

@ -202,16 +202,18 @@ public class FakePostOffice implements PostOffice {
return RoutingStatus.OK;
}
@Override
public RoutingStatus route(Message message, Transaction tx, boolean direct, boolean rejectDuplicates, Binding binding) throws Exception {
return null;
}
@Override
public RoutingStatus route(Message message, RoutingContext context, boolean direct) throws Exception {
return null;
}
@Override
public RoutingStatus route(Message message,
RoutingContext context,
boolean direct,
boolean rejectDuplicates) throws Exception {
public RoutingStatus route(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, Binding binding) throws Exception {
return null;
}