ARTEMIS-5131 Add A Copy message button to console
This exposes a copyMessage method that simply copies a message to a different queue so the new console can add a copy button
This commit is contained in:
parent
7f1751c43b
commit
c9f9b33bf9
|
@ -2801,4 +2801,11 @@ public interface AuditLogger {
|
||||||
@LogMessage(id = 601789, value = "User {} is getting the number of messages received on target resource: {}", level = LogMessage.Level.INFO)
|
@LogMessage(id = 601789, value = "User {} is getting the number of messages received on target resource: {}", level = LogMessage.Level.INFO)
|
||||||
void getMessagesReceived(String user, Object source);
|
void getMessagesReceived(String user, Object source);
|
||||||
|
|
||||||
|
|
||||||
|
static void copyMessage(Object source, Object... args) {
|
||||||
|
BASE_LOGGER.copyMessage(getCaller(), source, parametersList(args));
|
||||||
|
}
|
||||||
|
|
||||||
|
@LogMessage(id = 601790, value = "User {} is copying a message to another queue on target resource: {} {}", level = LogMessage.Level.INFO)
|
||||||
|
void copyMessage(String user, Object source, String args);
|
||||||
}
|
}
|
||||||
|
|
|
@ -536,6 +536,10 @@ public interface QueueControl {
|
||||||
@Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates,
|
@Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates,
|
||||||
@Parameter(name = "messageCount", desc = "Number of messages to move.") int messageCount) throws Exception;
|
@Parameter(name = "messageCount", desc = "Number of messages to move.") int messageCount) throws Exception;
|
||||||
|
|
||||||
|
@Operation(desc = "Send a copy of the message with given messageID to another queue)", impact = MBeanOperationInfo.ACTION)
|
||||||
|
boolean copyMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID,
|
||||||
|
@Parameter(name = "targetQueue", desc = "The name of the queue to copy the messages to") String targetQueue) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends the message corresponding to the specified message ID to this queue's dead letter address.
|
* Sends the message corresponding to the specified message ID to this queue's dead letter address.
|
||||||
*
|
*
|
||||||
|
|
|
@ -1361,6 +1361,32 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean copyMessage(final long messageID,
|
||||||
|
final String targetQueue) throws Exception {
|
||||||
|
// this is a critical task, we need to prevent parallel tasks running
|
||||||
|
try (AutoCloseable lock = server.managementLock()) {
|
||||||
|
if (AuditLogger.isBaseLoggingEnabled()) {
|
||||||
|
AuditLogger.copyMessage(queue, messageID, targetQueue);
|
||||||
|
}
|
||||||
|
checkStarted();
|
||||||
|
|
||||||
|
clearIO();
|
||||||
|
try {
|
||||||
|
Binding binding = server.getPostOffice().getBinding(SimpleString.of(targetQueue));
|
||||||
|
|
||||||
|
if (binding == null) {
|
||||||
|
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(targetQueue);
|
||||||
|
}
|
||||||
|
|
||||||
|
return queue.copyReference(messageID, binding.getAddress(), binding);
|
||||||
|
} finally {
|
||||||
|
blockOnIO();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int moveMessages(final String filterStr, final String otherQueueName) throws Exception {
|
public int moveMessages(final String filterStr, final String otherQueueName) throws Exception {
|
||||||
return moveMessages(filterStr, otherQueueName, false);
|
return moveMessages(filterStr, otherQueueName, false);
|
||||||
|
|
|
@ -380,6 +380,9 @@ public interface Queue extends Bindable,CriticalComponent {
|
||||||
int messageCount,
|
int messageCount,
|
||||||
Binding binding) throws Exception;
|
Binding binding) throws Exception;
|
||||||
|
|
||||||
|
|
||||||
|
boolean copyReference(long messageID, SimpleString queue, Binding binding) throws Exception;
|
||||||
|
|
||||||
int retryMessages(Filter filter) throws Exception;
|
int retryMessages(Filter filter) throws Exception;
|
||||||
|
|
||||||
default int retryMessages(Filter filter, Integer expectedHits) throws Exception {
|
default int retryMessages(Filter filter, Integer expectedHits) throws Exception {
|
||||||
|
|
|
@ -2790,6 +2790,26 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized boolean copyReference(final long messageID,
|
||||||
|
final SimpleString toQueue,
|
||||||
|
final Binding binding) throws Exception {
|
||||||
|
try (LinkedListIterator<MessageReference> iter = iterator()) {
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
MessageReference ref = iter.next();
|
||||||
|
if (ref.getMessage().getMessageID() == messageID) {
|
||||||
|
try {
|
||||||
|
copy(null, toQueue, binding, ref);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public synchronized int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
|
public synchronized int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
|
||||||
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
|
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -3678,6 +3698,38 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
return routingStatus;
|
return routingStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private RoutingStatus copy(final Transaction originalTX,
|
||||||
|
final SimpleString address,
|
||||||
|
final Binding binding,
|
||||||
|
final MessageReference ref) throws Exception {
|
||||||
|
Transaction tx;
|
||||||
|
|
||||||
|
if (originalTX != null) {
|
||||||
|
tx = originalTX;
|
||||||
|
} else {
|
||||||
|
// if no TX we create a new one to commit at the end
|
||||||
|
tx = new TransactionImpl(storageManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
Message copyMessage = makeCopy(ref, false, false, address);
|
||||||
|
|
||||||
|
Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE);
|
||||||
|
if (originalRoutingType != null && originalRoutingType instanceof Byte) {
|
||||||
|
copyMessage.setRoutingType(RoutingType.getType((Byte) originalRoutingType));
|
||||||
|
}
|
||||||
|
|
||||||
|
RoutingStatus routingStatus;
|
||||||
|
{
|
||||||
|
RoutingContext context = new RoutingContextImpl(tx);
|
||||||
|
routingStatus = postOffice.route(copyMessage, context, false, false, binding);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (originalTX == null) {
|
||||||
|
tx.commit();
|
||||||
|
}
|
||||||
|
return routingStatus;
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings({"ArrayToString", "ArrayToStringConcatenation"})
|
@SuppressWarnings({"ArrayToString", "ArrayToStringConcatenation"})
|
||||||
private boolean moveBetweenSnFQueues(final SimpleString queueSuffix,
|
private boolean moveBetweenSnFQueues(final SimpleString queueSuffix,
|
||||||
final Transaction tx,
|
final Transaction tx,
|
||||||
|
|
|
@ -728,6 +728,11 @@ public class RoutingContextTest {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean copyReference(long messageID, SimpleString address, Binding binding) throws Exception {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int retryMessages(Filter filter) throws Exception {
|
public int retryMessages(Filter filter) throws Exception {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -1486,6 +1486,11 @@ public class ScheduledDeliveryHandlerTest {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean copyReference(long messageID, SimpleString address, Binding binding) throws Exception {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addRedistributor(long delay) {
|
public void addRedistributor(long delay) {
|
||||||
|
|
||||||
|
|
|
@ -948,6 +948,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean copyReference(long messageID, SimpleString address, Binding binding) throws Exception {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void forceDelivery() {
|
public void forceDelivery() {
|
||||||
// no-op
|
// no-op
|
||||||
|
|
|
@ -17,9 +17,12 @@
|
||||||
package org.apache.activemq.artemis.tests.integration.management;
|
package org.apache.activemq.artemis.tests.integration.management;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.json.JsonArray;
|
import org.apache.activemq.artemis.json.JsonArray;
|
||||||
import org.apache.activemq.artemis.json.JsonNumber;
|
import org.apache.activemq.artemis.json.JsonNumber;
|
||||||
import org.apache.activemq.artemis.json.JsonObject;
|
import org.apache.activemq.artemis.json.JsonObject;
|
||||||
|
@ -27,6 +30,8 @@ import org.apache.activemq.artemis.json.JsonValue;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||||
|
@ -200,6 +205,125 @@ public class ManagementWithPagingServerTest extends ManagementTestBase {
|
||||||
assertNull(console.getError());
|
assertNull(console.getError());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCopyMessageWhilstPaging() throws Exception {
|
||||||
|
SimpleString address = RandomUtil.randomSimpleString();
|
||||||
|
SimpleString queue = RandomUtil.randomSimpleString();
|
||||||
|
|
||||||
|
SimpleString otherAddress = RandomUtil.randomSimpleString();
|
||||||
|
SimpleString otherQueue = RandomUtil.randomSimpleString();
|
||||||
|
|
||||||
|
session1.createQueue(QueueConfiguration.of(queue).setAddress(address));
|
||||||
|
session1.createQueue(QueueConfiguration.of(otherQueue).setAddress(otherAddress));
|
||||||
|
|
||||||
|
QueueControl queueControl = createManagementControl(address, queue);
|
||||||
|
|
||||||
|
QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue);
|
||||||
|
|
||||||
|
int num = 100;
|
||||||
|
|
||||||
|
ClientProducer producer = session1.createProducer(address);
|
||||||
|
for (int i = 0; i < num; i++) {
|
||||||
|
ClientMessage message = session1.createMessage(true).writeBodyBufferString("Message" + i);
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Object>[] messages = queueControl.listMessages(null);
|
||||||
|
|
||||||
|
long messageID = (Long) messages[99].get("messageID");
|
||||||
|
|
||||||
|
assertFalse(queueControl.copyMessage(messageID, otherQueue.toString()));
|
||||||
|
|
||||||
|
messageID = (Long) messages[0].get("messageID");
|
||||||
|
|
||||||
|
assertTrue(queueControl.copyMessage(messageID, otherQueue.toString()));
|
||||||
|
|
||||||
|
Map<String, Object>[] copiedMessages = otherQueueControl.listMessages(null);
|
||||||
|
|
||||||
|
assertEquals(1, copiedMessages.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCopyMessageWhilstPagingSameAddress() throws Exception {
|
||||||
|
SimpleString address = RandomUtil.randomSimpleString();
|
||||||
|
SimpleString queue = RandomUtil.randomSimpleString();
|
||||||
|
|
||||||
|
SimpleString otherQueue = RandomUtil.randomSimpleString();
|
||||||
|
|
||||||
|
session1.createQueue(QueueConfiguration.of(queue).setAddress(address).setRoutingType(RoutingType.ANYCAST));
|
||||||
|
session1.createQueue(QueueConfiguration.of(otherQueue).setAddress(address).setRoutingType(RoutingType.ANYCAST));
|
||||||
|
|
||||||
|
QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST);
|
||||||
|
|
||||||
|
QueueControl otherQueueControl = createManagementControl(address, otherQueue, RoutingType.ANYCAST);
|
||||||
|
|
||||||
|
int num = 200;
|
||||||
|
|
||||||
|
ClientProducer producer = session1.createProducer(address);
|
||||||
|
for (int i = 0; i < num; i++) {
|
||||||
|
ClientMessage message = session1.createMessage(true).writeBodyBufferString("Message" + i);
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Object>[] messages = queueControl.listMessages(null);
|
||||||
|
|
||||||
|
assertEquals(100, messages.length);
|
||||||
|
|
||||||
|
Map<String, Object>[] otherMessages = otherQueueControl.listMessages(null);
|
||||||
|
|
||||||
|
assertEquals(100, otherMessages.length);
|
||||||
|
|
||||||
|
long messageID = (Long) messages[0].get("messageID");
|
||||||
|
|
||||||
|
assertTrue(queueControl.copyMessage(messageID, otherQueue.toString()));
|
||||||
|
|
||||||
|
otherMessages = otherQueueControl.listMessages(null);
|
||||||
|
|
||||||
|
assertEquals(101, otherMessages.length);
|
||||||
|
|
||||||
|
messageID = (Long) otherMessages[100].get("messageID");
|
||||||
|
|
||||||
|
//this should fail as the message was paged successfully
|
||||||
|
assertFalse(otherQueueControl.copyMessage(messageID, queue.toString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMoveMessageWhilstPagingAndConsuming() throws Exception {
|
||||||
|
SimpleString address = RandomUtil.randomSimpleString();
|
||||||
|
SimpleString queue = RandomUtil.randomSimpleString();
|
||||||
|
|
||||||
|
SimpleString otherAddress = RandomUtil.randomSimpleString();
|
||||||
|
SimpleString otherQueue = RandomUtil.randomSimpleString();
|
||||||
|
|
||||||
|
session1.createQueue(QueueConfiguration.of(queue).setAddress(address));
|
||||||
|
session1.createQueue(QueueConfiguration.of(otherQueue).setAddress(otherAddress));
|
||||||
|
|
||||||
|
QueueControl queueControl = createManagementControl(address, queue);
|
||||||
|
|
||||||
|
QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue);
|
||||||
|
|
||||||
|
int num = 1000;
|
||||||
|
|
||||||
|
ClientProducer producer = session1.createProducer(address);
|
||||||
|
for (int i = 0; i < num; i++) {
|
||||||
|
ClientMessage message = session1.createMessage(true).writeBodyBufferString("Message" + i);
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
ManagementCopyThread console = new ManagementCopyThread(queueControl, otherQueue.toString());
|
||||||
|
ReceiverThread receiver = new ReceiverThread(queue, num, 0);
|
||||||
|
console.start();
|
||||||
|
receiver.start();
|
||||||
|
|
||||||
|
receiver.join();
|
||||||
|
console.stop = true;
|
||||||
|
console.join();
|
||||||
|
|
||||||
|
Map<String, Object>[] messages = otherQueueControl.listMessages(null);
|
||||||
|
|
||||||
|
assertEquals(messages.length, console.copiedMessages);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
|
@ -345,4 +469,44 @@ public class ManagementWithPagingServerTest extends ManagementTestBase {
|
||||||
stop = true;
|
stop = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class ManagementCopyThread extends Thread {
|
||||||
|
|
||||||
|
private QueueControl queueControl;
|
||||||
|
private String queue;
|
||||||
|
private volatile boolean stop = false;
|
||||||
|
|
||||||
|
int copiedMessages = 0;
|
||||||
|
private Exception error = null;
|
||||||
|
|
||||||
|
private ManagementCopyThread(QueueControl queueControl, String queue) {
|
||||||
|
this.queueControl = queueControl;
|
||||||
|
this.queue = queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
Random random = new Random(System.currentTimeMillis());
|
||||||
|
while (!stop) {
|
||||||
|
long messageID = random.nextInt(1000);
|
||||||
|
boolean copied = queueControl.copyMessage(messageID, queue);
|
||||||
|
System.out.println("messageID = " + messageID);
|
||||||
|
if (copied) {
|
||||||
|
copiedMessages++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
error = e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Exception getError() {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void exit() {
|
||||||
|
stop = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,7 @@ import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
@ -55,6 +56,7 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||||
|
@ -2618,6 +2620,268 @@ public class QueueControlTest extends ManagementTestBase {
|
||||||
session.deleteQueue(otherQueue);
|
session.deleteQueue(otherQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@TestTemplate
|
||||||
|
public void testCopyMessage() throws Exception {
|
||||||
|
SimpleString address = SimpleString.of("address");//RandomUtil.randomSimpleString();
|
||||||
|
SimpleString queue = SimpleString.of("queue");//RandomUtil.randomSimpleString();
|
||||||
|
SimpleString otherAddress = SimpleString.of("otherAddress");//RandomUtil.randomSimpleString();
|
||||||
|
SimpleString otherQueue = SimpleString.of("otherQueue");//RandomUtil.randomSimpleString();
|
||||||
|
SimpleString otherQueue2 = SimpleString.of("otherQueue2");//RandomUtil.randomSimpleString();
|
||||||
|
|
||||||
|
|
||||||
|
session.createQueue(QueueConfiguration.of(queue).setAddress(address).setDurable(durable));
|
||||||
|
session.createQueue(QueueConfiguration.of(otherQueue).setAddress(otherAddress).setDurable(durable));
|
||||||
|
session.createQueue(QueueConfiguration.of(otherQueue2).setAddress(otherAddress).setDurable(durable));
|
||||||
|
ClientProducer producer = session.createProducer(address);
|
||||||
|
|
||||||
|
// send 2 messages on queue
|
||||||
|
producer.send(session.createMessage(durable));
|
||||||
|
producer.send(session.createMessage(durable));
|
||||||
|
|
||||||
|
QueueControl queueControl = createManagementControl(address, queue);
|
||||||
|
QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue);
|
||||||
|
QueueControl otherQueueControl2 = createManagementControl(otherAddress, otherQueue2);
|
||||||
|
assertMessageMetrics(queueControl, 2, durable);
|
||||||
|
assertMessageMetrics(otherQueueControl, 0, durable);
|
||||||
|
assertMessageMetrics(otherQueueControl2, 0, durable);
|
||||||
|
|
||||||
|
// the message IDs are set on the server
|
||||||
|
Map<String, Object>[] messages = queueControl.listMessages(null);
|
||||||
|
assertEquals(2, messages.length);
|
||||||
|
long messageID = (Long) messages[0].get("messageID");
|
||||||
|
|
||||||
|
boolean copied = queueControl.copyMessage(messageID, otherQueue.toString());
|
||||||
|
assertTrue(copied);
|
||||||
|
|
||||||
|
assertMessageMetrics(queueControl, 2, durable);
|
||||||
|
assertMessageMetrics(otherQueueControl, 1, durable);
|
||||||
|
assertMessageMetrics(otherQueueControl2, 0, durable);
|
||||||
|
|
||||||
|
messageID = (Long) messages[1].get("messageID");
|
||||||
|
copied = queueControl.copyMessage(messageID, otherQueue.toString());
|
||||||
|
assertTrue(copied);
|
||||||
|
|
||||||
|
assertMessageMetrics(queueControl, 2, durable);
|
||||||
|
assertMessageMetrics(otherQueueControl, 2, durable);
|
||||||
|
assertMessageMetrics(otherQueueControl2, 0, durable);
|
||||||
|
|
||||||
|
consumeMessages(2, session, queue);
|
||||||
|
consumeMessages(2, session, otherQueue);
|
||||||
|
consumeMessages(0, session, otherQueue2);
|
||||||
|
|
||||||
|
session.deleteQueue(queue);
|
||||||
|
session.deleteQueue(otherQueue);
|
||||||
|
session.deleteQueue(otherQueue2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@TestTemplate
|
||||||
|
public void testCopyLargeMessage() throws Exception {
|
||||||
|
SimpleString address = SimpleString.of("address");//RandomUtil.randomSimpleString();
|
||||||
|
SimpleString queue = SimpleString.of("queue");//RandomUtil.randomSimpleString();
|
||||||
|
SimpleString otherAddress = SimpleString.of("otherAddress");//RandomUtil.randomSimpleString();
|
||||||
|
SimpleString otherQueue = SimpleString.of("otherQueue");//RandomUtil.randomSimpleString();
|
||||||
|
SimpleString otherQueue2 = SimpleString.of("otherQueue2");//RandomUtil.randomSimpleString();
|
||||||
|
|
||||||
|
session.createQueue(QueueConfiguration.of(queue).setAddress(address).setDurable(durable));
|
||||||
|
session.createQueue(QueueConfiguration.of(otherQueue).setAddress(otherAddress).setDurable(durable));
|
||||||
|
session.createQueue(QueueConfiguration.of(otherQueue2).setAddress(otherAddress).setDurable(durable));
|
||||||
|
ClientProducer producer = session.createProducer(address);
|
||||||
|
|
||||||
|
final byte[] payload1 = new byte[ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE + 1000];
|
||||||
|
final Random random1 = new Random(System.currentTimeMillis());
|
||||||
|
random1.nextBytes(payload1);
|
||||||
|
final ClientMessage sentMessage1 = session.createMessage(durable).writeBodyBufferBytes(payload1);
|
||||||
|
|
||||||
|
final byte[] payload2 = new byte[ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE + 1000];
|
||||||
|
final Random random2 = new Random(System.currentTimeMillis());
|
||||||
|
random2.nextBytes(payload2);
|
||||||
|
final ClientMessage sentMessage2 = session.createMessage(durable).writeBodyBufferBytes(payload2);
|
||||||
|
|
||||||
|
// send 2 messages on queue
|
||||||
|
producer.send(sentMessage1);
|
||||||
|
producer.send(sentMessage2);
|
||||||
|
|
||||||
|
QueueControl queueControl = createManagementControl(address, queue);
|
||||||
|
QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue);
|
||||||
|
QueueControl otherQueueControl2 = createManagementControl(otherAddress, otherQueue2);
|
||||||
|
assertMessageMetrics(queueControl, 2, durable);
|
||||||
|
assertMessageMetrics(otherQueueControl, 0, durable);
|
||||||
|
assertMessageMetrics(otherQueueControl2, 0, durable);
|
||||||
|
|
||||||
|
// the message IDs are set on the server
|
||||||
|
Map<String, Object>[] messages = queueControl.listMessages(null);
|
||||||
|
assertEquals(2, messages.length);
|
||||||
|
long messageID = (Long) messages[0].get("messageID");
|
||||||
|
|
||||||
|
boolean copied = queueControl.copyMessage(messageID, otherQueue.toString());
|
||||||
|
assertTrue(copied);
|
||||||
|
|
||||||
|
assertMessageMetrics(queueControl, 2, durable);
|
||||||
|
assertMessageMetrics(otherQueueControl, 1, durable);
|
||||||
|
assertMessageMetrics(otherQueueControl2, 0, durable);
|
||||||
|
|
||||||
|
messageID = (Long) messages[1].get("messageID");
|
||||||
|
copied = queueControl.copyMessage(messageID, otherQueue.toString());
|
||||||
|
assertTrue(copied);
|
||||||
|
|
||||||
|
assertMessageMetrics(queueControl, 2, durable);
|
||||||
|
assertMessageMetrics(otherQueueControl, 2, durable);
|
||||||
|
assertMessageMetrics(otherQueueControl2, 0, durable);
|
||||||
|
|
||||||
|
ClientConsumer consumer1 = session.createConsumer(queue);
|
||||||
|
ClientMessage clientMessage = consumer1.receiveImmediate();
|
||||||
|
assertNotNull(clientMessage);
|
||||||
|
byte[] returnedPayload = new byte[clientMessage.getBodySize()];
|
||||||
|
clientMessage.getBodyBuffer().readBytes(returnedPayload);
|
||||||
|
assertEqualsByteArrays(payload1, returnedPayload);
|
||||||
|
|
||||||
|
clientMessage = consumer1.receiveImmediate();
|
||||||
|
assertNotNull(clientMessage);
|
||||||
|
returnedPayload = new byte[clientMessage.getBodySize()];
|
||||||
|
clientMessage.getBodyBuffer().readBytes(returnedPayload);
|
||||||
|
assertEqualsByteArrays(payload2, returnedPayload);
|
||||||
|
|
||||||
|
ClientConsumer consumer2 = session.createConsumer(otherQueue);
|
||||||
|
clientMessage = consumer2.receiveImmediate();
|
||||||
|
assertNotNull(clientMessage);
|
||||||
|
returnedPayload = new byte[clientMessage.getBodySize()];
|
||||||
|
clientMessage.getBodyBuffer().readBytes(returnedPayload);
|
||||||
|
assertEqualsByteArrays(payload1, returnedPayload);
|
||||||
|
|
||||||
|
clientMessage = consumer2.receiveImmediate();
|
||||||
|
assertNotNull(clientMessage);
|
||||||
|
returnedPayload = new byte[clientMessage.getBodySize()];
|
||||||
|
clientMessage.getBodyBuffer().readBytes(returnedPayload);
|
||||||
|
assertEqualsByteArrays(payload2, returnedPayload);
|
||||||
|
|
||||||
|
consumeMessages(0, session, otherQueue2);
|
||||||
|
|
||||||
|
consumer1.close();
|
||||||
|
consumer2.close();
|
||||||
|
|
||||||
|
session.deleteQueue(queue);
|
||||||
|
session.deleteQueue(otherQueue);
|
||||||
|
session.deleteQueue(otherQueue2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@TestTemplate
|
||||||
|
public void testCoreCopyMessage() throws Exception {
|
||||||
|
testCopyMessage("CORE", false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@TestTemplate
|
||||||
|
public void testCoreCopyLargeMessage() throws Exception {
|
||||||
|
testCopyMessage("CORE", true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@TestTemplate
|
||||||
|
public void testAMQPCopyMessage() throws Exception {
|
||||||
|
testCopyMessage("AMQP", false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@TestTemplate
|
||||||
|
public void testAMQPCopyLargeMessage() throws Exception {
|
||||||
|
testCopyMessage("AMQP", true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@TestTemplate
|
||||||
|
public void testOpenwireCopyMessage() throws Exception {
|
||||||
|
testCopyMessage("OPENWIRE", false);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@TestTemplate
|
||||||
|
public void testOpenwireCopyLargeMessage() throws Exception {
|
||||||
|
testCopyMessage("OPENWIRE", true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testCopyMessage(String protocol, boolean isLarge) throws Exception {
|
||||||
|
SimpleString address = SimpleString.of("queue1");//RandomUtil.randomSimpleString();
|
||||||
|
SimpleString queue = SimpleString.of("queue1");//RandomUtil.randomSimpleString();
|
||||||
|
SimpleString otherAddress = SimpleString.of("queue2");//RandomUtil.randomSimpleString();
|
||||||
|
SimpleString otherQueue = SimpleString.of("queue2");//RandomUtil.randomSimpleString();
|
||||||
|
|
||||||
|
|
||||||
|
session.createQueue(QueueConfiguration.of(queue).setAddress(address).setDurable(durable).setRoutingType(RoutingType.ANYCAST));
|
||||||
|
session.createQueue(QueueConfiguration.of(otherQueue).setAddress(otherAddress).setDurable(durable).setRoutingType(RoutingType.ANYCAST));
|
||||||
|
|
||||||
|
ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
|
||||||
|
|
||||||
|
try (Connection connection = connectionFactory.createConnection()) {
|
||||||
|
Session jmsProducerSession = connection.createSession(Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Session jmsConsumerSession = connection.createSession(Session.AUTO_ACKNOWLEDGE);
|
||||||
|
javax.jms.Queue queue1 = jmsProducerSession.createQueue("queue1");
|
||||||
|
javax.jms.Queue queue2 = jmsConsumerSession.createQueue("queue2");
|
||||||
|
|
||||||
|
MessageProducer producer = jmsProducerSession.createProducer(queue1);
|
||||||
|
|
||||||
|
final String payload1;
|
||||||
|
final String payload2;
|
||||||
|
if (isLarge) {
|
||||||
|
final Random random1 = new Random(System.currentTimeMillis());
|
||||||
|
byte[] bytePayload1 = new byte[ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE + 1000];
|
||||||
|
random1.nextBytes(bytePayload1);
|
||||||
|
payload1 = new String(bytePayload1);
|
||||||
|
final Random random2 = new Random(System.currentTimeMillis());
|
||||||
|
byte[] bytePayload2 = new byte[ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE + 1000];
|
||||||
|
random2.nextBytes(bytePayload2);
|
||||||
|
payload2 = new String(bytePayload2);
|
||||||
|
} else {
|
||||||
|
payload1 = "message1";
|
||||||
|
payload2 = "message2";
|
||||||
|
}
|
||||||
|
|
||||||
|
// send 2 messages on queue
|
||||||
|
producer.send(jmsProducerSession.createTextMessage(payload1));
|
||||||
|
producer.send(jmsProducerSession.createTextMessage(payload2));
|
||||||
|
|
||||||
|
QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST);
|
||||||
|
QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue, RoutingType.ANYCAST);
|
||||||
|
assertMessageMetrics(queueControl, 2, durable);
|
||||||
|
assertMessageMetrics(otherQueueControl, 0, durable);
|
||||||
|
|
||||||
|
// the message IDs are set on the server
|
||||||
|
Map<String, Object>[] messages = queueControl.listMessages(null);
|
||||||
|
assertEquals(2, messages.length);
|
||||||
|
long messageID = (Long) messages[0].get("messageID");
|
||||||
|
|
||||||
|
boolean copied = queueControl.copyMessage(messageID, otherQueue.toString());
|
||||||
|
assertTrue(copied);
|
||||||
|
|
||||||
|
assertMessageMetrics(queueControl, 2, durable);
|
||||||
|
assertMessageMetrics(otherQueueControl, 1, durable);
|
||||||
|
|
||||||
|
messageID = (Long) messages[1].get("messageID");
|
||||||
|
copied = queueControl.copyMessage(messageID, otherQueue.toString());
|
||||||
|
assertTrue(copied);
|
||||||
|
|
||||||
|
assertMessageMetrics(queueControl, 2, durable);
|
||||||
|
assertMessageMetrics(otherQueueControl, 2, durable);
|
||||||
|
|
||||||
|
MessageConsumer consumer1 = jmsConsumerSession.createConsumer(queue1);
|
||||||
|
MessageConsumer consumer2 = jmsConsumerSession.createConsumer(queue2);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
javax.jms.TextMessage message = (TextMessage) consumer1.receive(500);
|
||||||
|
assertNotNull(message);
|
||||||
|
assertEquals(payload1, message.getText());
|
||||||
|
message = (TextMessage) consumer1.receive(500);
|
||||||
|
assertNotNull(message);
|
||||||
|
assertEquals(payload2, message.getText());
|
||||||
|
|
||||||
|
message = (TextMessage) consumer2.receive(500);
|
||||||
|
assertNotNull(message);
|
||||||
|
assertEquals(payload1, message.getText());
|
||||||
|
message = (TextMessage) consumer2.receive(500);
|
||||||
|
assertNotNull(message);
|
||||||
|
assertEquals(payload2, message.getText());
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
session.deleteQueue(queue);
|
||||||
|
session.deleteQueue(otherQueue);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Moving message from another address to a single "child" queue of a multicast address
|
* Moving message from another address to a single "child" queue of a multicast address
|
||||||
*
|
*
|
||||||
|
|
|
@ -506,6 +506,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
|
||||||
return (Integer) proxy.invokeOperation(Integer.class, "moveMessages", flushLimit, filter, otherQueueName, rejectDuplicates, messageCount);
|
return (Integer) proxy.invokeOperation(Integer.class, "moveMessages", flushLimit, filter, otherQueueName, rejectDuplicates, messageCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean copyMessage(long messageID, String targetQueue) throws Exception {
|
||||||
|
return (Boolean) proxy.invokeOperation("copyMessage", messageID, targetQueue);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int moveMessages(final String filter,
|
public int moveMessages(final String filter,
|
||||||
final String otherQueueName,
|
final String otherQueueName,
|
||||||
|
|
Loading…
Reference in New Issue