diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Transfer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Transfer.java index 7f7b7d0d83..7a1feb53ec 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Transfer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Transfer.java @@ -105,10 +105,207 @@ public class Transfer extends InputAbstract { @Option(name = "--target-topic", description = "Destination to be used. It can be prefixed with queue:// or topic:// and can be an FQQN in the form of
::. (Default: queue://TEST)") String targetTopic; - boolean isCopy() { + @Option(name = "--message-count", description = "Number of messages to transfer.") + int messageCount = Integer.MAX_VALUE; + + public String getSourceURL() { + return sourceURL; + } + + public Transfer setSourceURL(String sourceURL) { + this.sourceURL = sourceURL; + return this; + } + + public String getSourceUser() { + return sourceUser; + } + + public Transfer setSourceUser(String sourceUser) { + this.sourceUser = sourceUser; + return this; + } + + public String getSourcePassword() { + return sourcePassword; + } + + public Transfer setSourcePassword(String sourcePassword) { + this.sourcePassword = sourcePassword; + return this; + } + + public String getTargetURL() { + return targetURL; + } + + public Transfer setTargetURL(String targetURL) { + this.targetURL = targetURL; + return this; + } + + public String getTargetUser() { + return targetUser; + } + + public Transfer setTargetUser(String targetUser) { + this.targetUser = targetUser; + return this; + } + + public String getTargetPassword() { + return targetPassword; + } + + public Transfer setTargetPassword(String targetPassword) { + this.targetPassword = targetPassword; + return this; + } + + public int getReceiveTimeout() { + return receiveTimeout; + } + + public Transfer setReceiveTimeout(int receiveTimeout) { + this.receiveTimeout = receiveTimeout; + return this; + } + + public String getSourceClientID() { + return sourceClientID; + } + + public Transfer setSourceClientID(String sourceClientID) { + this.sourceClientID = sourceClientID; + return this; + } + + public String getSourceProtocol() { + return sourceProtocol; + } + + public Transfer setSourceProtocol(String sourceProtocol) { + this.sourceProtocol = sourceProtocol; + return this; + } + + public String getSourceQueue() { + return sourceQueue; + } + + public Transfer setSourceQueue(String sourceQueue) { + this.sourceQueue = sourceQueue; + return this; + } + + public String getSharedDurableSubscription() { + return sharedDurableSubscription; + } + + public Transfer setSharedDurableSubscription(String sharedDurableSubscription) { + this.sharedDurableSubscription = sharedDurableSubscription; + return this; + } + + public String getSharedSubscription() { + return sharedSubscription; + } + + public Transfer setSharedSubscription(String sharedSubscription) { + this.sharedSubscription = sharedSubscription; + return this; + } + + public String getDurableConsumer() { + return durableConsumer; + } + + public Transfer setDurableConsumer(String durableConsumer) { + this.durableConsumer = durableConsumer; + return this; + } + + public boolean isNoLocal() { + return noLocal; + } + + public Transfer setNoLocal(boolean noLocal) { + this.noLocal = noLocal; + return this; + } + + public String getSourceTopic() { + return sourceTopic; + } + + public Transfer setSourceTopic(String sourceTopic) { + this.sourceTopic = sourceTopic; + return this; + } + + public String getFilter() { + return filter; + } + + public Transfer setFilter(String filter) { + this.filter = filter; + return this; + } + + public String getTargetProtocol() { + return targetProtocol; + } + + public Transfer setTargetProtocol(String targetProtocol) { + this.targetProtocol = targetProtocol; + return this; + } + + public int getCommitInterval() { + return commitInterval; + } + + public Transfer setCommitInterval(int commitInterval) { + this.commitInterval = commitInterval; + return this; + } + + public boolean isCopy() { return copy; } + public Transfer setCopy(boolean copy) { + this.copy = copy; + return this; + } + + public String getTargetQueue() { + return targetQueue; + } + + public Transfer setTargetQueue(String targetQueue) { + this.targetQueue = targetQueue; + return this; + } + + public String getTargetTopic() { + return targetTopic; + } + + public Transfer setTargetTopic(String targetTopic) { + this.targetTopic = targetTopic; + return this; + } + + public int getMessageCount() { + return messageCount; + } + + public Transfer setMessageCount(int messageCount) { + this.messageCount = messageCount; + return this; + } + @SuppressWarnings("StringEquality") @Override public Object execute(ActionContext context) throws Exception { @@ -183,7 +380,7 @@ public class Transfer extends InputAbstract { sourceConnection.start(); int pending = 0, total = 0; - while (true) { + while (total < messageCount) { Message receivedMessage; if (receiveTimeout < 0) { @@ -231,7 +428,7 @@ public class Transfer extends InputAbstract { sourceConnection.close(); targetConnection.close(); - return null; + return total; } Destination createDestination(String role, Session session, String queue, String topic) throws Exception { diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliTestBase.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliTestBase.java index 861c59e85f..6a6b2d3dfc 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliTestBase.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliTestBase.java @@ -37,6 +37,7 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Session; import java.io.File; import java.util.ArrayList; @@ -142,18 +143,30 @@ public class CliTestBase { } protected List consumeMessages(Session session, String address, int noMessages, boolean fqqn) throws Exception { - Destination destination = fqqn ? session.createQueue(address) : getDestination(address); - MessageConsumer consumer = session.createConsumer(destination); - List messages = new ArrayList<>(); - for (int i = 0; i < noMessages; i++) { - Message m = consumer.receive(1000); - assertNotNull(m); - messages.add(m); + Destination destination = fqqn ? session.createQueue(address) : getDestination(address); + + try (MessageConsumer consumer = session.createConsumer(destination)) { + for (int i = 0; i < noMessages; i++) { + Message m = consumer.receive(1000); + assertNotNull(m); + messages.add(m); + } } + return messages; } + protected void produceMessages(Session session, String address, int noMessages, boolean fqqn) throws Exception { + Destination destination = fqqn ? session.createQueue(address) : getDestination(address); + + try (MessageProducer producer = session.createProducer(destination)) { + for (int i = 0; i < noMessages; i++) { + producer.send(session.createTextMessage("test message: " + i)); + } + } + } + Destination getDestination(String queueName) { return ActiveMQDestination.createDestination("queue://" + queueName, ActiveMQDestination.TYPE.QUEUE); } diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/TransferTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/TransferTest.java new file mode 100644 index 0000000000..97d76ff9a4 --- /dev/null +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/TransferTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.cli.test; + +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.cli.commands.messages.Transfer; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.management.ManagementContext; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.Session; + +public class TransferTest extends CliTestBase { + private Connection connection; + private ActiveMQConnectionFactory cf; + private ActiveMQServer server; + private static final int TEST_MESSAGE_COUNT = 10; + + @Before + @Override + public void setup() throws Exception { + setupAuth(); + super.setup(); + server = ((Pair)startServer()).getB(); + cf = getConnectionFactory(61616); + connection = cf.createConnection("admin", "admin"); + } + + @After + @Override + public void tearDown() throws Exception { + closeConnection(cf, connection); + super.tearDown(); + } + + @Test + public void testTransferMessages() throws Exception { + testTransferMessages(TEST_MESSAGE_COUNT, 0); + } + + @Test + public void testTransferMessagesWithMessageCount() throws Exception { + testTransferMessages(TEST_MESSAGE_COUNT, 5); + } + + private void testTransferMessages(int messages, int limit) throws Exception { + String sourceQueueName = "SOURCE_QUEUE"; + String targetQueueName = "TARGET_QUEUE"; + + Session session = createSession(connection); + produceMessages(session, sourceQueueName, messages, false); + + Queue sourceQueue = server.locateQueue(sourceQueueName); + + Assert.assertEquals(messages, sourceQueue.getMessageCount()); + + Transfer transfer = new Transfer() + .setSourceUser("admin") + .setSourcePassword("admin") + .setSourceQueue(sourceQueueName) + .setTargetUser("admin") + .setTargetPassword("admin") + .setTargetQueue(targetQueueName); + + if (limit > 0) { + transfer.setMessageCount(limit); + + Assert.assertEquals(limit, transfer.execute(new TestActionContext())); + + Queue targetQueue = server.locateQueue(targetQueueName); + Assert.assertEquals(messages - limit, sourceQueue.getMessageCount()); + Assert.assertEquals(limit, targetQueue.getMessageCount()); + } else { + Assert.assertEquals(messages, transfer.execute(new TestActionContext())); + + Queue targetQueue = server.locateQueue(targetQueueName); + Assert.assertEquals(0, sourceQueue.getMessageCount()); + Assert.assertEquals(messages, targetQueue.getMessageCount()); + } + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index 414a9da69b..f5fc797e77 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -523,6 +523,13 @@ public interface QueueControl { @Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName, @Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception; + @Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages)", impact = MBeanOperationInfo.ACTION) + int moveMessages(@Parameter(name = "flushLimit", desc = "Limit to flush transactions during the operation to avoid OutOfMemory") int flushLimit, + @Parameter(name = "filter", desc = "A message filter (can be empty)") String filter, + @Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName, + @Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates, + @Parameter(name = "messageCount", desc = "Number of messages to move.") int messageCount) throws Exception; + /** * Sends the message corresponding to the specified message ID to this queue's dead letter address. * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 67f75496a1..5555b36a4b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -1245,8 +1245,17 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { final String filterStr, final String otherQueueName, final boolean rejectDuplicates) throws Exception { + return moveMessages(flushLimit, filterStr, otherQueueName, rejectDuplicates, -1); + } + + @Override + public int moveMessages(final int flushLimit, + final String filterStr, + final String otherQueueName, + final boolean rejectDuplicates, + final int messageCount) throws Exception { if (AuditLogger.isEnabled()) { - AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates); + AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates, messageCount); } checkStarted(); @@ -1260,7 +1269,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName); } - int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, binding); + int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, messageCount, binding); return retValue; } finally { blockOnIO(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 39abbc4769..e08274a748 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -358,6 +358,13 @@ public interface Queue extends Bindable,CriticalComponent { boolean rejectDuplicates, Binding binding) throws Exception; + int moveReferences(int flushLimit, + Filter filter, + SimpleString toAddress, + boolean rejectDuplicates, + int messageCount, + Binding binding) throws Exception; + int retryMessages(Filter filter) throws Exception; default int retryMessages(Filter filter, Integer expectedHits) throws Exception { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 276bd9e3ec..082c9cb687 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2544,9 +2544,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { final SimpleString toAddress, final boolean rejectDuplicates, final Binding binding) throws Exception { + return moveReferences(flushLimit, filter, toAddress, rejectDuplicates, -1, binding); + } + + @Override + public int moveReferences(final int flushLimit, + final Filter filter, + final SimpleString toAddress, + final boolean rejectDuplicates, + final int messageCount, + final Binding binding) throws Exception { + final Integer expectedHits = messageCount > 0 ? messageCount : null; final DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress); return iterQueue(flushLimit, filter, new QueueIterateAction() { + @Override + public Integer expectedHits() { + return expectedHits; + } + @Override public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { boolean ignored = false; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 3a150c7637..607670ad0c 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1459,6 +1459,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { return 0; } + @Override + public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress, boolean rejectDuplicates, int messageCount, Binding binding) throws Exception { + return 0; + } + @Override public void addRedistributor(long delay) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index a16c49dd2b..7ff689793e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -63,6 +63,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.DivertConfiguration; +import org.apache.activemq.artemis.core.management.impl.QueueControlImpl; import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl; import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; @@ -1800,6 +1801,44 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(otherQueue); } + @Test + public void testMoveMessagesWithMessageCount() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + SimpleString otherAddress = RandomUtil.randomSimpleString(); + SimpleString otherQueue = RandomUtil.randomSimpleString(); + + session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable)); + session.createQueue(new QueueConfiguration(otherQueue).setAddress(otherAddress).setDurable(durable)); + ClientProducer producer = session.createProducer(address); + + for (int i = 0; i < 10; i++) { + ClientMessage message = session.createMessage(durable); + SimpleString key = RandomUtil.randomSimpleString(); + long value = RandomUtil.randomLong(); + message.putLongProperty(key, value); + producer.send(message); + } + + final LocalQueueBinding binding = (LocalQueueBinding) server.getPostOffice().getBinding(queue); + Assert.assertEquals(10, binding.getQueue().getMessageCount()); + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(10, queueControl.getMessageCount()); + + // moved all messages to otherQueue + int movedMessagesCount = queueControl.moveMessages(QueueControlImpl.FLUSH_LIMIT, null, otherQueue.toString(), false, 5); + Assert.assertEquals(5, movedMessagesCount); + Assert.assertEquals(5, queueControl.getMessageCount()); + + consumeMessages(5, session, queue); + + consumeMessages(5, session, otherQueue); + + session.deleteQueue(queue); + session.deleteQueue(otherQueue); + } + @Test public void testMoveMessage() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index 44babe8ba6..b0ddd65d82 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -456,6 +456,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest { return (Integer) proxy.invokeOperation("moveMessages", flushLimit, filter, otherQueueName, rejectDuplicates); } + @Override + public int moveMessages(int flushLimit, String filter, String otherQueueName, boolean rejectDuplicates, int messageCount) throws Exception { + return (Integer) proxy.invokeOperation("moveMessages", flushLimit, filter, otherQueueName, rejectDuplicates, messageCount); + } + @Override public int moveMessages(final String filter, final String otherQueueName, diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index cb2aff6ad2..5c182f662e 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -917,6 +917,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { return 0; } + @Override + public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress, boolean rejectDuplicates, int messageCount, Binding binding) throws Exception { + return 0; + } + @Override public void forceDelivery() { // no-op