This commit is contained in:
Clebert Suconic 2021-05-24 14:09:20 -04:00
commit 0f2c4f295e
11 changed files with 417 additions and 12 deletions

View File

@ -105,10 +105,207 @@ public class Transfer extends InputAbstract {
@Option(name = "--target-topic", description = "Destination to be used. It can be prefixed with queue:// or topic:// and can be an FQQN in the form of <address>::<queue>. (Default: queue://TEST)")
String targetTopic;
boolean isCopy() {
@Option(name = "--message-count", description = "Number of messages to transfer.")
int messageCount = Integer.MAX_VALUE;
public String getSourceURL() {
return sourceURL;
}
public Transfer setSourceURL(String sourceURL) {
this.sourceURL = sourceURL;
return this;
}
public String getSourceUser() {
return sourceUser;
}
public Transfer setSourceUser(String sourceUser) {
this.sourceUser = sourceUser;
return this;
}
public String getSourcePassword() {
return sourcePassword;
}
public Transfer setSourcePassword(String sourcePassword) {
this.sourcePassword = sourcePassword;
return this;
}
public String getTargetURL() {
return targetURL;
}
public Transfer setTargetURL(String targetURL) {
this.targetURL = targetURL;
return this;
}
public String getTargetUser() {
return targetUser;
}
public Transfer setTargetUser(String targetUser) {
this.targetUser = targetUser;
return this;
}
public String getTargetPassword() {
return targetPassword;
}
public Transfer setTargetPassword(String targetPassword) {
this.targetPassword = targetPassword;
return this;
}
public int getReceiveTimeout() {
return receiveTimeout;
}
public Transfer setReceiveTimeout(int receiveTimeout) {
this.receiveTimeout = receiveTimeout;
return this;
}
public String getSourceClientID() {
return sourceClientID;
}
public Transfer setSourceClientID(String sourceClientID) {
this.sourceClientID = sourceClientID;
return this;
}
public String getSourceProtocol() {
return sourceProtocol;
}
public Transfer setSourceProtocol(String sourceProtocol) {
this.sourceProtocol = sourceProtocol;
return this;
}
public String getSourceQueue() {
return sourceQueue;
}
public Transfer setSourceQueue(String sourceQueue) {
this.sourceQueue = sourceQueue;
return this;
}
public String getSharedDurableSubscription() {
return sharedDurableSubscription;
}
public Transfer setSharedDurableSubscription(String sharedDurableSubscription) {
this.sharedDurableSubscription = sharedDurableSubscription;
return this;
}
public String getSharedSubscription() {
return sharedSubscription;
}
public Transfer setSharedSubscription(String sharedSubscription) {
this.sharedSubscription = sharedSubscription;
return this;
}
public String getDurableConsumer() {
return durableConsumer;
}
public Transfer setDurableConsumer(String durableConsumer) {
this.durableConsumer = durableConsumer;
return this;
}
public boolean isNoLocal() {
return noLocal;
}
public Transfer setNoLocal(boolean noLocal) {
this.noLocal = noLocal;
return this;
}
public String getSourceTopic() {
return sourceTopic;
}
public Transfer setSourceTopic(String sourceTopic) {
this.sourceTopic = sourceTopic;
return this;
}
public String getFilter() {
return filter;
}
public Transfer setFilter(String filter) {
this.filter = filter;
return this;
}
public String getTargetProtocol() {
return targetProtocol;
}
public Transfer setTargetProtocol(String targetProtocol) {
this.targetProtocol = targetProtocol;
return this;
}
public int getCommitInterval() {
return commitInterval;
}
public Transfer setCommitInterval(int commitInterval) {
this.commitInterval = commitInterval;
return this;
}
public boolean isCopy() {
return copy;
}
public Transfer setCopy(boolean copy) {
this.copy = copy;
return this;
}
public String getTargetQueue() {
return targetQueue;
}
public Transfer setTargetQueue(String targetQueue) {
this.targetQueue = targetQueue;
return this;
}
public String getTargetTopic() {
return targetTopic;
}
public Transfer setTargetTopic(String targetTopic) {
this.targetTopic = targetTopic;
return this;
}
public int getMessageCount() {
return messageCount;
}
public Transfer setMessageCount(int messageCount) {
this.messageCount = messageCount;
return this;
}
@SuppressWarnings("StringEquality")
@Override
public Object execute(ActionContext context) throws Exception {
@ -183,7 +380,7 @@ public class Transfer extends InputAbstract {
sourceConnection.start();
int pending = 0, total = 0;
while (true) {
while (total < messageCount) {
Message receivedMessage;
if (receiveTimeout < 0) {
@ -231,7 +428,7 @@ public class Transfer extends InputAbstract {
sourceConnection.close();
targetConnection.close();
return null;
return total;
}
Destination createDestination(String role, Session session, String queue, String topic) throws Exception {

View File

@ -37,6 +37,7 @@ import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.File;
import java.util.ArrayList;
@ -142,18 +143,30 @@ public class CliTestBase {
}
protected List<Message> consumeMessages(Session session, String address, int noMessages, boolean fqqn) throws Exception {
Destination destination = fqqn ? session.createQueue(address) : getDestination(address);
MessageConsumer consumer = session.createConsumer(destination);
List<Message> messages = new ArrayList<>();
Destination destination = fqqn ? session.createQueue(address) : getDestination(address);
try (MessageConsumer consumer = session.createConsumer(destination)) {
for (int i = 0; i < noMessages; i++) {
Message m = consumer.receive(1000);
assertNotNull(m);
messages.add(m);
}
}
return messages;
}
protected void produceMessages(Session session, String address, int noMessages, boolean fqqn) throws Exception {
Destination destination = fqqn ? session.createQueue(address) : getDestination(address);
try (MessageProducer producer = session.createProducer(destination)) {
for (int i = 0; i < noMessages; i++) {
producer.send(session.createTextMessage("test message: " + i));
}
}
}
Destination getDestination(String queueName) {
return ActiveMQDestination.createDestination("queue://" + queueName, ActiveMQDestination.TYPE.QUEUE);
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.cli.test;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.cli.commands.messages.Transfer;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.management.ManagementContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.Session;
public class TransferTest extends CliTestBase {
private Connection connection;
private ActiveMQConnectionFactory cf;
private ActiveMQServer server;
private static final int TEST_MESSAGE_COUNT = 10;
@Before
@Override
public void setup() throws Exception {
setupAuth();
super.setup();
server = ((Pair<ManagementContext, ActiveMQServer>)startServer()).getB();
cf = getConnectionFactory(61616);
connection = cf.createConnection("admin", "admin");
}
@After
@Override
public void tearDown() throws Exception {
closeConnection(cf, connection);
super.tearDown();
}
@Test
public void testTransferMessages() throws Exception {
testTransferMessages(TEST_MESSAGE_COUNT, 0);
}
@Test
public void testTransferMessagesWithMessageCount() throws Exception {
testTransferMessages(TEST_MESSAGE_COUNT, 5);
}
private void testTransferMessages(int messages, int limit) throws Exception {
String sourceQueueName = "SOURCE_QUEUE";
String targetQueueName = "TARGET_QUEUE";
Session session = createSession(connection);
produceMessages(session, sourceQueueName, messages, false);
Queue sourceQueue = server.locateQueue(sourceQueueName);
Assert.assertEquals(messages, sourceQueue.getMessageCount());
Transfer transfer = new Transfer()
.setSourceUser("admin")
.setSourcePassword("admin")
.setSourceQueue(sourceQueueName)
.setTargetUser("admin")
.setTargetPassword("admin")
.setTargetQueue(targetQueueName);
if (limit > 0) {
transfer.setMessageCount(limit);
Assert.assertEquals(limit, transfer.execute(new TestActionContext()));
Queue targetQueue = server.locateQueue(targetQueueName);
Assert.assertEquals(messages - limit, sourceQueue.getMessageCount());
Assert.assertEquals(limit, targetQueue.getMessageCount());
} else {
Assert.assertEquals(messages, transfer.execute(new TestActionContext()));
Queue targetQueue = server.locateQueue(targetQueueName);
Assert.assertEquals(0, sourceQueue.getMessageCount());
Assert.assertEquals(messages, targetQueue.getMessageCount());
}
}
}

View File

@ -523,6 +523,13 @@ public interface QueueControl {
@Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName,
@Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception;
@Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages)", impact = MBeanOperationInfo.ACTION)
int moveMessages(@Parameter(name = "flushLimit", desc = "Limit to flush transactions during the operation to avoid OutOfMemory") int flushLimit,
@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter,
@Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName,
@Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates,
@Parameter(name = "messageCount", desc = "Number of messages to move.") int messageCount) throws Exception;
/**
* Sends the message corresponding to the specified message ID to this queue's dead letter address.
*

View File

@ -1245,8 +1245,17 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
final String filterStr,
final String otherQueueName,
final boolean rejectDuplicates) throws Exception {
return moveMessages(flushLimit, filterStr, otherQueueName, rejectDuplicates, -1);
}
@Override
public int moveMessages(final int flushLimit,
final String filterStr,
final String otherQueueName,
final boolean rejectDuplicates,
final int messageCount) throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates);
AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates, messageCount);
}
checkStarted();
@ -1260,7 +1269,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
}
int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, binding);
int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, messageCount, binding);
return retValue;
} finally {
blockOnIO();

View File

@ -358,6 +358,13 @@ public interface Queue extends Bindable,CriticalComponent {
boolean rejectDuplicates,
Binding binding) throws Exception;
int moveReferences(int flushLimit,
Filter filter,
SimpleString toAddress,
boolean rejectDuplicates,
int messageCount,
Binding binding) throws Exception;
int retryMessages(Filter filter) throws Exception;
default int retryMessages(Filter filter, Integer expectedHits) throws Exception {

View File

@ -2544,9 +2544,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final SimpleString toAddress,
final boolean rejectDuplicates,
final Binding binding) throws Exception {
return moveReferences(flushLimit, filter, toAddress, rejectDuplicates, -1, binding);
}
@Override
public int moveReferences(final int flushLimit,
final Filter filter,
final SimpleString toAddress,
final boolean rejectDuplicates,
final int messageCount,
final Binding binding) throws Exception {
final Integer expectedHits = messageCount > 0 ? messageCount : null;
final DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress);
return iterQueue(flushLimit, filter, new QueueIterateAction() {
@Override
public Integer expectedHits() {
return expectedHits;
}
@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
boolean ignored = false;

View File

@ -1459,6 +1459,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return 0;
}
@Override
public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress, boolean rejectDuplicates, int messageCount, Binding binding) throws Exception {
return 0;
}
@Override
public void addRedistributor(long delay) {

View File

@ -63,6 +63,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
@ -1800,6 +1801,44 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(otherQueue);
}
@Test
public void testMoveMessagesWithMessageCount() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
SimpleString otherAddress = RandomUtil.randomSimpleString();
SimpleString otherQueue = RandomUtil.randomSimpleString();
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));
session.createQueue(new QueueConfiguration(otherQueue).setAddress(otherAddress).setDurable(durable));
ClientProducer producer = session.createProducer(address);
for (int i = 0; i < 10; i++) {
ClientMessage message = session.createMessage(durable);
SimpleString key = RandomUtil.randomSimpleString();
long value = RandomUtil.randomLong();
message.putLongProperty(key, value);
producer.send(message);
}
final LocalQueueBinding binding = (LocalQueueBinding) server.getPostOffice().getBinding(queue);
Assert.assertEquals(10, binding.getQueue().getMessageCount());
QueueControl queueControl = createManagementControl(address, queue);
Assert.assertEquals(10, queueControl.getMessageCount());
// moved all messages to otherQueue
int movedMessagesCount = queueControl.moveMessages(QueueControlImpl.FLUSH_LIMIT, null, otherQueue.toString(), false, 5);
Assert.assertEquals(5, movedMessagesCount);
Assert.assertEquals(5, queueControl.getMessageCount());
consumeMessages(5, session, queue);
consumeMessages(5, session, otherQueue);
session.deleteQueue(queue);
session.deleteQueue(otherQueue);
}
@Test
public void testMoveMessage() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();

View File

@ -456,6 +456,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (Integer) proxy.invokeOperation("moveMessages", flushLimit, filter, otherQueueName, rejectDuplicates);
}
@Override
public int moveMessages(int flushLimit, String filter, String otherQueueName, boolean rejectDuplicates, int messageCount) throws Exception {
return (Integer) proxy.invokeOperation("moveMessages", flushLimit, filter, otherQueueName, rejectDuplicates, messageCount);
}
@Override
public int moveMessages(final String filter,
final String otherQueueName,

View File

@ -917,6 +917,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
return 0;
}
@Override
public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress, boolean rejectDuplicates, int messageCount, Binding binding) throws Exception {
return 0;
}
@Override
public void forceDelivery() {
// no-op