From c9f9b33bf953b519659cca87df35cc87de89fc9d Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Wed, 23 Oct 2024 16:22:59 +0100 Subject: [PATCH] ARTEMIS-5131 Add A Copy message button to console This exposes a copyMessage method that simply copies a message to a different queue so the new console can add a copy button --- .../activemq/artemis/logs/AuditLogger.java | 7 + .../api/core/management/QueueControl.java | 4 + .../management/impl/QueueControlImpl.java | 26 ++ .../activemq/artemis/core/server/Queue.java | 3 + .../artemis/core/server/impl/QueueImpl.java | 52 ++++ .../core/server/impl/RoutingContextTest.java | 5 + .../impl/ScheduledDeliveryHandlerTest.java | 5 + .../core/postoffice/impl/fakes/FakeQueue.java | 5 + .../ManagementWithPagingServerTest.java | 164 +++++++++++ .../management/QueueControlTest.java | 264 ++++++++++++++++++ .../management/QueueControlUsingCoreTest.java | 5 + 11 files changed, 540 insertions(+) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java index 2773ab3427..85082e542b 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java @@ -2801,4 +2801,11 @@ public interface AuditLogger { @LogMessage(id = 601789, value = "User {} is getting the number of messages received on target resource: {}", level = LogMessage.Level.INFO) void getMessagesReceived(String user, Object source); + + static void copyMessage(Object source, Object... args) { + BASE_LOGGER.copyMessage(getCaller(), source, parametersList(args)); + } + + @LogMessage(id = 601790, value = "User {} is copying a message to another queue on target resource: {} {}", level = LogMessage.Level.INFO) + void copyMessage(String user, Object source, String args); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index 29aa30f028..8fd14b2eea 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -536,6 +536,10 @@ public interface QueueControl { @Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates, @Parameter(name = "messageCount", desc = "Number of messages to move.") int messageCount) throws Exception; + @Operation(desc = "Send a copy of the message with given messageID to another queue)", impact = MBeanOperationInfo.ACTION) + boolean copyMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID, + @Parameter(name = "targetQueue", desc = "The name of the queue to copy the messages to") String targetQueue) throws Exception; + /** * Sends the message corresponding to the specified message ID to this queue's dead letter address. * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 86988c1c6c..5223dfd694 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -1361,6 +1361,32 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } + @Override + public boolean copyMessage(final long messageID, + final String targetQueue) throws Exception { + // this is a critical task, we need to prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.copyMessage(queue, messageID, targetQueue); + } + checkStarted(); + + clearIO(); + try { + Binding binding = server.getPostOffice().getBinding(SimpleString.of(targetQueue)); + + if (binding == null) { + throw ActiveMQMessageBundle.BUNDLE.noQueueFound(targetQueue); + } + + return queue.copyReference(messageID, binding.getAddress(), binding); + } finally { + blockOnIO(); + } + } + + } + @Override public int moveMessages(final String filterStr, final String otherQueueName) throws Exception { return moveMessages(filterStr, otherQueueName, false); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index a88235cbfa..1af9a0371e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -380,6 +380,9 @@ public interface Queue extends Bindable,CriticalComponent { int messageCount, Binding binding) throws Exception; + + boolean copyReference(long messageID, SimpleString queue, Binding binding) throws Exception; + int retryMessages(Filter filter) throws Exception; default int retryMessages(Filter filter, Integer expectedHits) throws Exception { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 39b4186acb..8f4249b16b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2790,6 +2790,26 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { }); } + @Override + public synchronized boolean copyReference(final long messageID, + final SimpleString toQueue, + final Binding binding) throws Exception { + try (LinkedListIterator iter = iterator()) { + while (iter.hasNext()) { + MessageReference ref = iter.next(); + if (ref.getMessage().getMessageID() == messageID) { + try { + copy(null, toQueue, binding, ref); + } catch (Exception e) { + throw e; + } + return true; + } + } + return false; + } + } + public synchronized int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception { return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() { @Override @@ -3678,6 +3698,38 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return routingStatus; } + private RoutingStatus copy(final Transaction originalTX, + final SimpleString address, + final Binding binding, + final MessageReference ref) throws Exception { + Transaction tx; + + if (originalTX != null) { + tx = originalTX; + } else { + // if no TX we create a new one to commit at the end + tx = new TransactionImpl(storageManager); + } + + Message copyMessage = makeCopy(ref, false, false, address); + + Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE); + if (originalRoutingType != null && originalRoutingType instanceof Byte) { + copyMessage.setRoutingType(RoutingType.getType((Byte) originalRoutingType)); + } + + RoutingStatus routingStatus; + { + RoutingContext context = new RoutingContextImpl(tx); + routingStatus = postOffice.route(copyMessage, context, false, false, binding); + } + + if (originalTX == null) { + tx.commit(); + } + return routingStatus; + } + @SuppressWarnings({"ArrayToString", "ArrayToStringConcatenation"}) private boolean moveBetweenSnFQueues(final SimpleString queueSuffix, final Transaction tx, diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java index 8a42425963..e3816a489f 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java @@ -728,6 +728,11 @@ public class RoutingContextTest { return 0; } + @Override + public boolean copyReference(long messageID, SimpleString address, Binding binding) throws Exception { + return false; + } + @Override public int retryMessages(Filter filter) throws Exception { return 0; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 2dc71e587a..18b055d628 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1486,6 +1486,11 @@ public class ScheduledDeliveryHandlerTest { return 0; } + @Override + public boolean copyReference(long messageID, SimpleString address, Binding binding) throws Exception { + return false; + } + @Override public void addRedistributor(long delay) { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/fakes/FakeQueue.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/fakes/FakeQueue.java index 8363ccb0f1..470705d16b 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/fakes/FakeQueue.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/fakes/FakeQueue.java @@ -948,6 +948,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { return 0; } + @Override + public boolean copyReference(long messageID, SimpleString address, Binding binding) throws Exception { + return false; + } + @Override public void forceDelivery() { // no-op diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java index 77fe005a79..6cf43600bf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java @@ -17,9 +17,12 @@ package org.apache.activemq.artemis.tests.integration.management; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.json.JsonArray; import org.apache.activemq.artemis.json.JsonNumber; import org.apache.activemq.artemis.json.JsonObject; @@ -27,6 +30,8 @@ import org.apache.activemq.artemis.json.JsonValue; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Random; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.JsonUtil; @@ -200,6 +205,125 @@ public class ManagementWithPagingServerTest extends ManagementTestBase { assertNull(console.getError()); } + @Test + public void testCopyMessageWhilstPaging() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + SimpleString otherAddress = RandomUtil.randomSimpleString(); + SimpleString otherQueue = RandomUtil.randomSimpleString(); + + session1.createQueue(QueueConfiguration.of(queue).setAddress(address)); + session1.createQueue(QueueConfiguration.of(otherQueue).setAddress(otherAddress)); + + QueueControl queueControl = createManagementControl(address, queue); + + QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue); + + int num = 100; + + ClientProducer producer = session1.createProducer(address); + for (int i = 0; i < num; i++) { + ClientMessage message = session1.createMessage(true).writeBodyBufferString("Message" + i); + producer.send(message); + } + + Map[] messages = queueControl.listMessages(null); + + long messageID = (Long) messages[99].get("messageID"); + + assertFalse(queueControl.copyMessage(messageID, otherQueue.toString())); + + messageID = (Long) messages[0].get("messageID"); + + assertTrue(queueControl.copyMessage(messageID, otherQueue.toString())); + + Map[] copiedMessages = otherQueueControl.listMessages(null); + + assertEquals(1, copiedMessages.length); + } + + @Test + public void testCopyMessageWhilstPagingSameAddress() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + SimpleString otherQueue = RandomUtil.randomSimpleString(); + + session1.createQueue(QueueConfiguration.of(queue).setAddress(address).setRoutingType(RoutingType.ANYCAST)); + session1.createQueue(QueueConfiguration.of(otherQueue).setAddress(address).setRoutingType(RoutingType.ANYCAST)); + + QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST); + + QueueControl otherQueueControl = createManagementControl(address, otherQueue, RoutingType.ANYCAST); + + int num = 200; + + ClientProducer producer = session1.createProducer(address); + for (int i = 0; i < num; i++) { + ClientMessage message = session1.createMessage(true).writeBodyBufferString("Message" + i); + producer.send(message); + } + + Map[] messages = queueControl.listMessages(null); + + assertEquals(100, messages.length); + + Map[] otherMessages = otherQueueControl.listMessages(null); + + assertEquals(100, otherMessages.length); + + long messageID = (Long) messages[0].get("messageID"); + + assertTrue(queueControl.copyMessage(messageID, otherQueue.toString())); + + otherMessages = otherQueueControl.listMessages(null); + + assertEquals(101, otherMessages.length); + + messageID = (Long) otherMessages[100].get("messageID"); + + //this should fail as the message was paged successfully + assertFalse(otherQueueControl.copyMessage(messageID, queue.toString())); + } + + @Test + public void testMoveMessageWhilstPagingAndConsuming() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + SimpleString otherAddress = RandomUtil.randomSimpleString(); + SimpleString otherQueue = RandomUtil.randomSimpleString(); + + session1.createQueue(QueueConfiguration.of(queue).setAddress(address)); + session1.createQueue(QueueConfiguration.of(otherQueue).setAddress(otherAddress)); + + QueueControl queueControl = createManagementControl(address, queue); + + QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue); + + int num = 1000; + + ClientProducer producer = session1.createProducer(address); + for (int i = 0; i < num; i++) { + ClientMessage message = session1.createMessage(true).writeBodyBufferString("Message" + i); + producer.send(message); + } + + ManagementCopyThread console = new ManagementCopyThread(queueControl, otherQueue.toString()); + ReceiverThread receiver = new ReceiverThread(queue, num, 0); + console.start(); + receiver.start(); + + receiver.join(); + console.stop = true; + console.join(); + + Map[] messages = otherQueueControl.listMessages(null); + + assertEquals(messages.length, console.copiedMessages); + } + @Override @BeforeEach public void setUp() throws Exception { @@ -345,4 +469,44 @@ public class ManagementWithPagingServerTest extends ManagementTestBase { stop = true; } } + + private class ManagementCopyThread extends Thread { + + private QueueControl queueControl; + private String queue; + private volatile boolean stop = false; + + int copiedMessages = 0; + private Exception error = null; + + private ManagementCopyThread(QueueControl queueControl, String queue) { + this.queueControl = queueControl; + this.queue = queue; + } + + @Override + public void run() { + try { + Random random = new Random(System.currentTimeMillis()); + while (!stop) { + long messageID = random.nextInt(1000); + boolean copied = queueControl.copyMessage(messageID, queue); + System.out.println("messageID = " + messageID); + if (copied) { + copiedMessages++; + } + } + } catch (Exception e) { + error = e; + } + } + + public Exception getError() { + return error; + } + + public void exit() { + stop = true; + } + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 1d8ba69cd6..48a2970b91 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -41,6 +41,7 @@ import java.util.Date; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -55,6 +56,7 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; @@ -2618,6 +2620,268 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(otherQueue); } + @TestTemplate + public void testCopyMessage() throws Exception { + SimpleString address = SimpleString.of("address");//RandomUtil.randomSimpleString(); + SimpleString queue = SimpleString.of("queue");//RandomUtil.randomSimpleString(); + SimpleString otherAddress = SimpleString.of("otherAddress");//RandomUtil.randomSimpleString(); + SimpleString otherQueue = SimpleString.of("otherQueue");//RandomUtil.randomSimpleString(); + SimpleString otherQueue2 = SimpleString.of("otherQueue2");//RandomUtil.randomSimpleString(); + + + session.createQueue(QueueConfiguration.of(queue).setAddress(address).setDurable(durable)); + session.createQueue(QueueConfiguration.of(otherQueue).setAddress(otherAddress).setDurable(durable)); + session.createQueue(QueueConfiguration.of(otherQueue2).setAddress(otherAddress).setDurable(durable)); + ClientProducer producer = session.createProducer(address); + + // send 2 messages on queue + producer.send(session.createMessage(durable)); + producer.send(session.createMessage(durable)); + + QueueControl queueControl = createManagementControl(address, queue); + QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue); + QueueControl otherQueueControl2 = createManagementControl(otherAddress, otherQueue2); + assertMessageMetrics(queueControl, 2, durable); + assertMessageMetrics(otherQueueControl, 0, durable); + assertMessageMetrics(otherQueueControl2, 0, durable); + + // the message IDs are set on the server + Map[] messages = queueControl.listMessages(null); + assertEquals(2, messages.length); + long messageID = (Long) messages[0].get("messageID"); + + boolean copied = queueControl.copyMessage(messageID, otherQueue.toString()); + assertTrue(copied); + + assertMessageMetrics(queueControl, 2, durable); + assertMessageMetrics(otherQueueControl, 1, durable); + assertMessageMetrics(otherQueueControl2, 0, durable); + + messageID = (Long) messages[1].get("messageID"); + copied = queueControl.copyMessage(messageID, otherQueue.toString()); + assertTrue(copied); + + assertMessageMetrics(queueControl, 2, durable); + assertMessageMetrics(otherQueueControl, 2, durable); + assertMessageMetrics(otherQueueControl2, 0, durable); + + consumeMessages(2, session, queue); + consumeMessages(2, session, otherQueue); + consumeMessages(0, session, otherQueue2); + + session.deleteQueue(queue); + session.deleteQueue(otherQueue); + session.deleteQueue(otherQueue2); + } + + @TestTemplate + public void testCopyLargeMessage() throws Exception { + SimpleString address = SimpleString.of("address");//RandomUtil.randomSimpleString(); + SimpleString queue = SimpleString.of("queue");//RandomUtil.randomSimpleString(); + SimpleString otherAddress = SimpleString.of("otherAddress");//RandomUtil.randomSimpleString(); + SimpleString otherQueue = SimpleString.of("otherQueue");//RandomUtil.randomSimpleString(); + SimpleString otherQueue2 = SimpleString.of("otherQueue2");//RandomUtil.randomSimpleString(); + + session.createQueue(QueueConfiguration.of(queue).setAddress(address).setDurable(durable)); + session.createQueue(QueueConfiguration.of(otherQueue).setAddress(otherAddress).setDurable(durable)); + session.createQueue(QueueConfiguration.of(otherQueue2).setAddress(otherAddress).setDurable(durable)); + ClientProducer producer = session.createProducer(address); + + final byte[] payload1 = new byte[ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE + 1000]; + final Random random1 = new Random(System.currentTimeMillis()); + random1.nextBytes(payload1); + final ClientMessage sentMessage1 = session.createMessage(durable).writeBodyBufferBytes(payload1); + + final byte[] payload2 = new byte[ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE + 1000]; + final Random random2 = new Random(System.currentTimeMillis()); + random2.nextBytes(payload2); + final ClientMessage sentMessage2 = session.createMessage(durable).writeBodyBufferBytes(payload2); + + // send 2 messages on queue + producer.send(sentMessage1); + producer.send(sentMessage2); + + QueueControl queueControl = createManagementControl(address, queue); + QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue); + QueueControl otherQueueControl2 = createManagementControl(otherAddress, otherQueue2); + assertMessageMetrics(queueControl, 2, durable); + assertMessageMetrics(otherQueueControl, 0, durable); + assertMessageMetrics(otherQueueControl2, 0, durable); + + // the message IDs are set on the server + Map[] messages = queueControl.listMessages(null); + assertEquals(2, messages.length); + long messageID = (Long) messages[0].get("messageID"); + + boolean copied = queueControl.copyMessage(messageID, otherQueue.toString()); + assertTrue(copied); + + assertMessageMetrics(queueControl, 2, durable); + assertMessageMetrics(otherQueueControl, 1, durable); + assertMessageMetrics(otherQueueControl2, 0, durable); + + messageID = (Long) messages[1].get("messageID"); + copied = queueControl.copyMessage(messageID, otherQueue.toString()); + assertTrue(copied); + + assertMessageMetrics(queueControl, 2, durable); + assertMessageMetrics(otherQueueControl, 2, durable); + assertMessageMetrics(otherQueueControl2, 0, durable); + + ClientConsumer consumer1 = session.createConsumer(queue); + ClientMessage clientMessage = consumer1.receiveImmediate(); + assertNotNull(clientMessage); + byte[] returnedPayload = new byte[clientMessage.getBodySize()]; + clientMessage.getBodyBuffer().readBytes(returnedPayload); + assertEqualsByteArrays(payload1, returnedPayload); + + clientMessage = consumer1.receiveImmediate(); + assertNotNull(clientMessage); + returnedPayload = new byte[clientMessage.getBodySize()]; + clientMessage.getBodyBuffer().readBytes(returnedPayload); + assertEqualsByteArrays(payload2, returnedPayload); + + ClientConsumer consumer2 = session.createConsumer(otherQueue); + clientMessage = consumer2.receiveImmediate(); + assertNotNull(clientMessage); + returnedPayload = new byte[clientMessage.getBodySize()]; + clientMessage.getBodyBuffer().readBytes(returnedPayload); + assertEqualsByteArrays(payload1, returnedPayload); + + clientMessage = consumer2.receiveImmediate(); + assertNotNull(clientMessage); + returnedPayload = new byte[clientMessage.getBodySize()]; + clientMessage.getBodyBuffer().readBytes(returnedPayload); + assertEqualsByteArrays(payload2, returnedPayload); + + consumeMessages(0, session, otherQueue2); + + consumer1.close(); + consumer2.close(); + + session.deleteQueue(queue); + session.deleteQueue(otherQueue); + session.deleteQueue(otherQueue2); + } + + @TestTemplate + public void testCoreCopyMessage() throws Exception { + testCopyMessage("CORE", false); + } + + @TestTemplate + public void testCoreCopyLargeMessage() throws Exception { + testCopyMessage("CORE", true); + } + + @TestTemplate + public void testAMQPCopyMessage() throws Exception { + testCopyMessage("AMQP", false); + } + + @TestTemplate + public void testAMQPCopyLargeMessage() throws Exception { + testCopyMessage("AMQP", true); + } + + @TestTemplate + public void testOpenwireCopyMessage() throws Exception { + testCopyMessage("OPENWIRE", false); + } + + + @TestTemplate + public void testOpenwireCopyLargeMessage() throws Exception { + testCopyMessage("OPENWIRE", true); + } + + public void testCopyMessage(String protocol, boolean isLarge) throws Exception { + SimpleString address = SimpleString.of("queue1");//RandomUtil.randomSimpleString(); + SimpleString queue = SimpleString.of("queue1");//RandomUtil.randomSimpleString(); + SimpleString otherAddress = SimpleString.of("queue2");//RandomUtil.randomSimpleString(); + SimpleString otherQueue = SimpleString.of("queue2");//RandomUtil.randomSimpleString(); + + + session.createQueue(QueueConfiguration.of(queue).setAddress(address).setDurable(durable).setRoutingType(RoutingType.ANYCAST)); + session.createQueue(QueueConfiguration.of(otherQueue).setAddress(otherAddress).setDurable(durable).setRoutingType(RoutingType.ANYCAST)); + + ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + + try (Connection connection = connectionFactory.createConnection()) { + Session jmsProducerSession = connection.createSession(Session.AUTO_ACKNOWLEDGE); + Session jmsConsumerSession = connection.createSession(Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue1 = jmsProducerSession.createQueue("queue1"); + javax.jms.Queue queue2 = jmsConsumerSession.createQueue("queue2"); + + MessageProducer producer = jmsProducerSession.createProducer(queue1); + + final String payload1; + final String payload2; + if (isLarge) { + final Random random1 = new Random(System.currentTimeMillis()); + byte[] bytePayload1 = new byte[ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE + 1000]; + random1.nextBytes(bytePayload1); + payload1 = new String(bytePayload1); + final Random random2 = new Random(System.currentTimeMillis()); + byte[] bytePayload2 = new byte[ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE + 1000]; + random2.nextBytes(bytePayload2); + payload2 = new String(bytePayload2); + } else { + payload1 = "message1"; + payload2 = "message2"; + } + + // send 2 messages on queue + producer.send(jmsProducerSession.createTextMessage(payload1)); + producer.send(jmsProducerSession.createTextMessage(payload2)); + + QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST); + QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue, RoutingType.ANYCAST); + assertMessageMetrics(queueControl, 2, durable); + assertMessageMetrics(otherQueueControl, 0, durable); + + // the message IDs are set on the server + Map[] messages = queueControl.listMessages(null); + assertEquals(2, messages.length); + long messageID = (Long) messages[0].get("messageID"); + + boolean copied = queueControl.copyMessage(messageID, otherQueue.toString()); + assertTrue(copied); + + assertMessageMetrics(queueControl, 2, durable); + assertMessageMetrics(otherQueueControl, 1, durable); + + messageID = (Long) messages[1].get("messageID"); + copied = queueControl.copyMessage(messageID, otherQueue.toString()); + assertTrue(copied); + + assertMessageMetrics(queueControl, 2, durable); + assertMessageMetrics(otherQueueControl, 2, durable); + + MessageConsumer consumer1 = jmsConsumerSession.createConsumer(queue1); + MessageConsumer consumer2 = jmsConsumerSession.createConsumer(queue2); + connection.start(); + + javax.jms.TextMessage message = (TextMessage) consumer1.receive(500); + assertNotNull(message); + assertEquals(payload1, message.getText()); + message = (TextMessage) consumer1.receive(500); + assertNotNull(message); + assertEquals(payload2, message.getText()); + + message = (TextMessage) consumer2.receive(500); + assertNotNull(message); + assertEquals(payload1, message.getText()); + message = (TextMessage) consumer2.receive(500); + assertNotNull(message); + assertEquals(payload2, message.getText()); + + connection.close(); + } + session.deleteQueue(queue); + session.deleteQueue(otherQueue); + } + /** * Moving message from another address to a single "child" queue of a multicast address * diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index ad2d4c4df8..5e7191e45a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -506,6 +506,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest { return (Integer) proxy.invokeOperation(Integer.class, "moveMessages", flushLimit, filter, otherQueueName, rejectDuplicates, messageCount); } + @Override + public boolean copyMessage(long messageID, String targetQueue) throws Exception { + return (Boolean) proxy.invokeOperation("copyMessage", messageID, targetQueue); + } + @Override public int moveMessages(final String filter, final String otherQueueName,