ARTEMIS-3309 Add a parameter to limit messages to move or transfer
This commit is contained in:
parent
8c2b80f234
commit
e03c4fe193
|
@ -105,10 +105,207 @@ public class Transfer extends InputAbstract {
|
||||||
@Option(name = "--target-topic", description = "Destination to be used. It can be prefixed with queue:// or topic:// and can be an FQQN in the form of <address>::<queue>. (Default: queue://TEST)")
|
@Option(name = "--target-topic", description = "Destination to be used. It can be prefixed with queue:// or topic:// and can be an FQQN in the form of <address>::<queue>. (Default: queue://TEST)")
|
||||||
String targetTopic;
|
String targetTopic;
|
||||||
|
|
||||||
boolean isCopy() {
|
@Option(name = "--message-count", description = "Number of messages to transfer.")
|
||||||
|
int messageCount = Integer.MAX_VALUE;
|
||||||
|
|
||||||
|
public String getSourceURL() {
|
||||||
|
return sourceURL;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setSourceURL(String sourceURL) {
|
||||||
|
this.sourceURL = sourceURL;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSourceUser() {
|
||||||
|
return sourceUser;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setSourceUser(String sourceUser) {
|
||||||
|
this.sourceUser = sourceUser;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSourcePassword() {
|
||||||
|
return sourcePassword;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setSourcePassword(String sourcePassword) {
|
||||||
|
this.sourcePassword = sourcePassword;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTargetURL() {
|
||||||
|
return targetURL;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setTargetURL(String targetURL) {
|
||||||
|
this.targetURL = targetURL;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTargetUser() {
|
||||||
|
return targetUser;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setTargetUser(String targetUser) {
|
||||||
|
this.targetUser = targetUser;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTargetPassword() {
|
||||||
|
return targetPassword;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setTargetPassword(String targetPassword) {
|
||||||
|
this.targetPassword = targetPassword;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getReceiveTimeout() {
|
||||||
|
return receiveTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setReceiveTimeout(int receiveTimeout) {
|
||||||
|
this.receiveTimeout = receiveTimeout;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSourceClientID() {
|
||||||
|
return sourceClientID;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setSourceClientID(String sourceClientID) {
|
||||||
|
this.sourceClientID = sourceClientID;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSourceProtocol() {
|
||||||
|
return sourceProtocol;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setSourceProtocol(String sourceProtocol) {
|
||||||
|
this.sourceProtocol = sourceProtocol;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSourceQueue() {
|
||||||
|
return sourceQueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setSourceQueue(String sourceQueue) {
|
||||||
|
this.sourceQueue = sourceQueue;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSharedDurableSubscription() {
|
||||||
|
return sharedDurableSubscription;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setSharedDurableSubscription(String sharedDurableSubscription) {
|
||||||
|
this.sharedDurableSubscription = sharedDurableSubscription;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSharedSubscription() {
|
||||||
|
return sharedSubscription;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setSharedSubscription(String sharedSubscription) {
|
||||||
|
this.sharedSubscription = sharedSubscription;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDurableConsumer() {
|
||||||
|
return durableConsumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setDurableConsumer(String durableConsumer) {
|
||||||
|
this.durableConsumer = durableConsumer;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isNoLocal() {
|
||||||
|
return noLocal;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setNoLocal(boolean noLocal) {
|
||||||
|
this.noLocal = noLocal;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSourceTopic() {
|
||||||
|
return sourceTopic;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setSourceTopic(String sourceTopic) {
|
||||||
|
this.sourceTopic = sourceTopic;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getFilter() {
|
||||||
|
return filter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setFilter(String filter) {
|
||||||
|
this.filter = filter;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTargetProtocol() {
|
||||||
|
return targetProtocol;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setTargetProtocol(String targetProtocol) {
|
||||||
|
this.targetProtocol = targetProtocol;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getCommitInterval() {
|
||||||
|
return commitInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setCommitInterval(int commitInterval) {
|
||||||
|
this.commitInterval = commitInterval;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isCopy() {
|
||||||
return copy;
|
return copy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Transfer setCopy(boolean copy) {
|
||||||
|
this.copy = copy;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTargetQueue() {
|
||||||
|
return targetQueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setTargetQueue(String targetQueue) {
|
||||||
|
this.targetQueue = targetQueue;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTargetTopic() {
|
||||||
|
return targetTopic;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setTargetTopic(String targetTopic) {
|
||||||
|
this.targetTopic = targetTopic;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMessageCount() {
|
||||||
|
return messageCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Transfer setMessageCount(int messageCount) {
|
||||||
|
this.messageCount = messageCount;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("StringEquality")
|
@SuppressWarnings("StringEquality")
|
||||||
@Override
|
@Override
|
||||||
public Object execute(ActionContext context) throws Exception {
|
public Object execute(ActionContext context) throws Exception {
|
||||||
|
@ -183,7 +380,7 @@ public class Transfer extends InputAbstract {
|
||||||
|
|
||||||
sourceConnection.start();
|
sourceConnection.start();
|
||||||
int pending = 0, total = 0;
|
int pending = 0, total = 0;
|
||||||
while (true) {
|
while (total < messageCount) {
|
||||||
|
|
||||||
Message receivedMessage;
|
Message receivedMessage;
|
||||||
if (receiveTimeout < 0) {
|
if (receiveTimeout < 0) {
|
||||||
|
@ -231,7 +428,7 @@ public class Transfer extends InputAbstract {
|
||||||
sourceConnection.close();
|
sourceConnection.close();
|
||||||
targetConnection.close();
|
targetConnection.close();
|
||||||
|
|
||||||
return null;
|
return total;
|
||||||
}
|
}
|
||||||
|
|
||||||
Destination createDestination(String role, Session session, String queue, String topic) throws Exception {
|
Destination createDestination(String role, Session session, String queue, String topic) throws Exception {
|
||||||
|
|
|
@ -37,6 +37,7 @@ import javax.jms.Destination;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -142,18 +143,30 @@ public class CliTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<Message> consumeMessages(Session session, String address, int noMessages, boolean fqqn) throws Exception {
|
protected List<Message> consumeMessages(Session session, String address, int noMessages, boolean fqqn) throws Exception {
|
||||||
Destination destination = fqqn ? session.createQueue(address) : getDestination(address);
|
|
||||||
MessageConsumer consumer = session.createConsumer(destination);
|
|
||||||
|
|
||||||
List<Message> messages = new ArrayList<>();
|
List<Message> messages = new ArrayList<>();
|
||||||
for (int i = 0; i < noMessages; i++) {
|
Destination destination = fqqn ? session.createQueue(address) : getDestination(address);
|
||||||
Message m = consumer.receive(1000);
|
|
||||||
assertNotNull(m);
|
try (MessageConsumer consumer = session.createConsumer(destination)) {
|
||||||
messages.add(m);
|
for (int i = 0; i < noMessages; i++) {
|
||||||
|
Message m = consumer.receive(1000);
|
||||||
|
assertNotNull(m);
|
||||||
|
messages.add(m);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return messages;
|
return messages;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void produceMessages(Session session, String address, int noMessages, boolean fqqn) throws Exception {
|
||||||
|
Destination destination = fqqn ? session.createQueue(address) : getDestination(address);
|
||||||
|
|
||||||
|
try (MessageProducer producer = session.createProducer(destination)) {
|
||||||
|
for (int i = 0; i < noMessages; i++) {
|
||||||
|
producer.send(session.createTextMessage("test message: " + i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Destination getDestination(String queueName) {
|
Destination getDestination(String queueName) {
|
||||||
return ActiveMQDestination.createDestination("queue://" + queueName, ActiveMQDestination.TYPE.QUEUE);
|
return ActiveMQDestination.createDestination("queue://" + queueName, ActiveMQDestination.TYPE.QUEUE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,102 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.cli.test;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
|
import org.apache.activemq.artemis.cli.commands.messages.Transfer;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.core.server.management.ManagementContext;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
public class TransferTest extends CliTestBase {
|
||||||
|
private Connection connection;
|
||||||
|
private ActiveMQConnectionFactory cf;
|
||||||
|
private ActiveMQServer server;
|
||||||
|
private static final int TEST_MESSAGE_COUNT = 10;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
@Override
|
||||||
|
public void setup() throws Exception {
|
||||||
|
setupAuth();
|
||||||
|
super.setup();
|
||||||
|
server = ((Pair<ManagementContext, ActiveMQServer>)startServer()).getB();
|
||||||
|
cf = getConnectionFactory(61616);
|
||||||
|
connection = cf.createConnection("admin", "admin");
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
@Override
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
closeConnection(cf, connection);
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTransferMessages() throws Exception {
|
||||||
|
testTransferMessages(TEST_MESSAGE_COUNT, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTransferMessagesWithMessageCount() throws Exception {
|
||||||
|
testTransferMessages(TEST_MESSAGE_COUNT, 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testTransferMessages(int messages, int limit) throws Exception {
|
||||||
|
String sourceQueueName = "SOURCE_QUEUE";
|
||||||
|
String targetQueueName = "TARGET_QUEUE";
|
||||||
|
|
||||||
|
Session session = createSession(connection);
|
||||||
|
produceMessages(session, sourceQueueName, messages, false);
|
||||||
|
|
||||||
|
Queue sourceQueue = server.locateQueue(sourceQueueName);
|
||||||
|
|
||||||
|
Assert.assertEquals(messages, sourceQueue.getMessageCount());
|
||||||
|
|
||||||
|
Transfer transfer = new Transfer()
|
||||||
|
.setSourceUser("admin")
|
||||||
|
.setSourcePassword("admin")
|
||||||
|
.setSourceQueue(sourceQueueName)
|
||||||
|
.setTargetUser("admin")
|
||||||
|
.setTargetPassword("admin")
|
||||||
|
.setTargetQueue(targetQueueName);
|
||||||
|
|
||||||
|
if (limit > 0) {
|
||||||
|
transfer.setMessageCount(limit);
|
||||||
|
|
||||||
|
Assert.assertEquals(limit, transfer.execute(new TestActionContext()));
|
||||||
|
|
||||||
|
Queue targetQueue = server.locateQueue(targetQueueName);
|
||||||
|
Assert.assertEquals(messages - limit, sourceQueue.getMessageCount());
|
||||||
|
Assert.assertEquals(limit, targetQueue.getMessageCount());
|
||||||
|
} else {
|
||||||
|
Assert.assertEquals(messages, transfer.execute(new TestActionContext()));
|
||||||
|
|
||||||
|
Queue targetQueue = server.locateQueue(targetQueueName);
|
||||||
|
Assert.assertEquals(0, sourceQueue.getMessageCount());
|
||||||
|
Assert.assertEquals(messages, targetQueue.getMessageCount());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -523,6 +523,13 @@ public interface QueueControl {
|
||||||
@Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName,
|
@Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName,
|
||||||
@Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception;
|
@Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception;
|
||||||
|
|
||||||
|
@Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages)", impact = MBeanOperationInfo.ACTION)
|
||||||
|
int moveMessages(@Parameter(name = "flushLimit", desc = "Limit to flush transactions during the operation to avoid OutOfMemory") int flushLimit,
|
||||||
|
@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter,
|
||||||
|
@Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName,
|
||||||
|
@Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates,
|
||||||
|
@Parameter(name = "messageCount", desc = "Number of messages to move.") int messageCount) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends the message corresponding to the specified message ID to this queue's dead letter address.
|
* Sends the message corresponding to the specified message ID to this queue's dead letter address.
|
||||||
*
|
*
|
||||||
|
|
|
@ -1245,8 +1245,17 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
||||||
final String filterStr,
|
final String filterStr,
|
||||||
final String otherQueueName,
|
final String otherQueueName,
|
||||||
final boolean rejectDuplicates) throws Exception {
|
final boolean rejectDuplicates) throws Exception {
|
||||||
|
return moveMessages(flushLimit, filterStr, otherQueueName, rejectDuplicates, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int moveMessages(final int flushLimit,
|
||||||
|
final String filterStr,
|
||||||
|
final String otherQueueName,
|
||||||
|
final boolean rejectDuplicates,
|
||||||
|
final int messageCount) throws Exception {
|
||||||
if (AuditLogger.isEnabled()) {
|
if (AuditLogger.isEnabled()) {
|
||||||
AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates);
|
AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates, messageCount);
|
||||||
}
|
}
|
||||||
checkStarted();
|
checkStarted();
|
||||||
|
|
||||||
|
@ -1260,7 +1269,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
|
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
|
||||||
}
|
}
|
||||||
|
|
||||||
int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, binding);
|
int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, messageCount, binding);
|
||||||
return retValue;
|
return retValue;
|
||||||
} finally {
|
} finally {
|
||||||
blockOnIO();
|
blockOnIO();
|
||||||
|
|
|
@ -358,6 +358,13 @@ public interface Queue extends Bindable,CriticalComponent {
|
||||||
boolean rejectDuplicates,
|
boolean rejectDuplicates,
|
||||||
Binding binding) throws Exception;
|
Binding binding) throws Exception;
|
||||||
|
|
||||||
|
int moveReferences(int flushLimit,
|
||||||
|
Filter filter,
|
||||||
|
SimpleString toAddress,
|
||||||
|
boolean rejectDuplicates,
|
||||||
|
int messageCount,
|
||||||
|
Binding binding) throws Exception;
|
||||||
|
|
||||||
int retryMessages(Filter filter) throws Exception;
|
int retryMessages(Filter filter) throws Exception;
|
||||||
|
|
||||||
default int retryMessages(Filter filter, Integer expectedHits) throws Exception {
|
default int retryMessages(Filter filter, Integer expectedHits) throws Exception {
|
||||||
|
|
|
@ -2544,9 +2544,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
final SimpleString toAddress,
|
final SimpleString toAddress,
|
||||||
final boolean rejectDuplicates,
|
final boolean rejectDuplicates,
|
||||||
final Binding binding) throws Exception {
|
final Binding binding) throws Exception {
|
||||||
|
return moveReferences(flushLimit, filter, toAddress, rejectDuplicates, -1, binding);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int moveReferences(final int flushLimit,
|
||||||
|
final Filter filter,
|
||||||
|
final SimpleString toAddress,
|
||||||
|
final boolean rejectDuplicates,
|
||||||
|
final int messageCount,
|
||||||
|
final Binding binding) throws Exception {
|
||||||
|
final Integer expectedHits = messageCount > 0 ? messageCount : null;
|
||||||
final DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress);
|
final DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress);
|
||||||
|
|
||||||
return iterQueue(flushLimit, filter, new QueueIterateAction() {
|
return iterQueue(flushLimit, filter, new QueueIterateAction() {
|
||||||
|
@Override
|
||||||
|
public Integer expectedHits() {
|
||||||
|
return expectedHits;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
|
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
|
||||||
boolean ignored = false;
|
boolean ignored = false;
|
||||||
|
|
|
@ -1459,6 +1459,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress, boolean rejectDuplicates, int messageCount, Binding binding) throws Exception {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addRedistributor(long delay) {
|
public void addRedistributor(long delay) {
|
||||||
|
|
||||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
|
||||||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||||
import org.apache.activemq.artemis.core.config.Configuration;
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
|
||||||
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
|
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
|
||||||
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
|
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
|
||||||
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||||
|
@ -1800,6 +1801,44 @@ public class QueueControlTest extends ManagementTestBase {
|
||||||
session.deleteQueue(otherQueue);
|
session.deleteQueue(otherQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMoveMessagesWithMessageCount() throws Exception {
|
||||||
|
SimpleString address = RandomUtil.randomSimpleString();
|
||||||
|
SimpleString queue = RandomUtil.randomSimpleString();
|
||||||
|
SimpleString otherAddress = RandomUtil.randomSimpleString();
|
||||||
|
SimpleString otherQueue = RandomUtil.randomSimpleString();
|
||||||
|
|
||||||
|
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));
|
||||||
|
session.createQueue(new QueueConfiguration(otherQueue).setAddress(otherAddress).setDurable(durable));
|
||||||
|
ClientProducer producer = session.createProducer(address);
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
ClientMessage message = session.createMessage(durable);
|
||||||
|
SimpleString key = RandomUtil.randomSimpleString();
|
||||||
|
long value = RandomUtil.randomLong();
|
||||||
|
message.putLongProperty(key, value);
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
final LocalQueueBinding binding = (LocalQueueBinding) server.getPostOffice().getBinding(queue);
|
||||||
|
Assert.assertEquals(10, binding.getQueue().getMessageCount());
|
||||||
|
|
||||||
|
QueueControl queueControl = createManagementControl(address, queue);
|
||||||
|
Assert.assertEquals(10, queueControl.getMessageCount());
|
||||||
|
|
||||||
|
// moved all messages to otherQueue
|
||||||
|
int movedMessagesCount = queueControl.moveMessages(QueueControlImpl.FLUSH_LIMIT, null, otherQueue.toString(), false, 5);
|
||||||
|
Assert.assertEquals(5, movedMessagesCount);
|
||||||
|
Assert.assertEquals(5, queueControl.getMessageCount());
|
||||||
|
|
||||||
|
consumeMessages(5, session, queue);
|
||||||
|
|
||||||
|
consumeMessages(5, session, otherQueue);
|
||||||
|
|
||||||
|
session.deleteQueue(queue);
|
||||||
|
session.deleteQueue(otherQueue);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMoveMessage() throws Exception {
|
public void testMoveMessage() throws Exception {
|
||||||
SimpleString address = RandomUtil.randomSimpleString();
|
SimpleString address = RandomUtil.randomSimpleString();
|
||||||
|
|
|
@ -456,6 +456,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
|
||||||
return (Integer) proxy.invokeOperation("moveMessages", flushLimit, filter, otherQueueName, rejectDuplicates);
|
return (Integer) proxy.invokeOperation("moveMessages", flushLimit, filter, otherQueueName, rejectDuplicates);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int moveMessages(int flushLimit, String filter, String otherQueueName, boolean rejectDuplicates, int messageCount) throws Exception {
|
||||||
|
return (Integer) proxy.invokeOperation("moveMessages", flushLimit, filter, otherQueueName, rejectDuplicates, messageCount);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int moveMessages(final String filter,
|
public int moveMessages(final String filter,
|
||||||
final String otherQueueName,
|
final String otherQueueName,
|
||||||
|
|
|
@ -917,6 +917,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress, boolean rejectDuplicates, int messageCount, Binding binding) throws Exception {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void forceDelivery() {
|
public void forceDelivery() {
|
||||||
// no-op
|
// no-op
|
||||||
|
|
Loading…
Reference in New Issue