ARTEMIS-3034 CLI Transfer Option

This commit is contained in:
Clebert Suconic 2020-12-14 17:33:05 -05:00
parent 25ba3c494f
commit 02bcb3195f
7 changed files with 791 additions and 49 deletions

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.cli.commands.Mask;
import org.apache.activemq.artemis.cli.commands.check.HelpCheck;
import org.apache.activemq.artemis.cli.commands.check.NodeCheck;
import org.apache.activemq.artemis.cli.commands.check.QueueCheck;
import org.apache.activemq.artemis.cli.commands.messages.Transfer;
import org.apache.activemq.artemis.cli.commands.queue.StatQueue;
import org.apache.activemq.artemis.cli.commands.Run;
import org.apache.activemq.artemis.cli.commands.Stop;
@ -155,7 +156,7 @@ public class Artemis {
private static Cli.CliBuilder<Action> builder(File artemisInstance) {
String instance = artemisInstance != null ? artemisInstance.getAbsolutePath() : System.getProperty("artemis.instance");
Cli.CliBuilder<Action> builder = Cli.<Action>builder("artemis").withDescription("ActiveMQ Artemis Command Line").withCommand(HelpAction.class).withCommand(Producer.class).withCommand(Consumer.class).withCommand(Browse.class).withCommand(Mask.class).withDefaultCommand(HelpAction.class);
Cli.CliBuilder<Action> builder = Cli.<Action>builder("artemis").withDescription("ActiveMQ Artemis Command Line").withCommand(HelpAction.class).withCommand(Producer.class).withCommand(Transfer.class).withCommand(Consumer.class).withCommand(Browse.class).withCommand(Mask.class).withDefaultCommand(HelpAction.class);
builder.withGroup("check").withDescription("Check tools group (node|queue) (example ./artemis check node)").
withDefaultCommand(HelpCheck.class).withCommands(NodeCheck.class, QueueCheck.class);

View File

@ -17,9 +17,17 @@
package org.apache.activemq.artemis.cli.commands;
import java.io.File;
import java.net.InetAddress;
import java.net.URI;
import java.util.Map;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.uri.SchemaConstants;
public abstract class ActionAbstract implements Action {
@ -67,6 +75,41 @@ public abstract class ActionAbstract implements Action {
return brokerInstance;
}
protected String getBrokerURLInstance() {
if (getBrokerInstance() != null) {
try {
FileConfiguration fileConfiguration = new FileConfiguration();
String brokerConfiguration = new File(new File(getBrokerEtc()), "broker.xml").toURI().toASCIIString();
FileDeploymentManager fileDeploymentManager = new FileDeploymentManager(brokerConfiguration);
fileDeploymentManager.addDeployable(fileConfiguration);
fileDeploymentManager.readConfiguration();
for (TransportConfiguration acceptorConfiguration: fileConfiguration.getAcceptorConfigurations()) {
if (acceptorConfiguration.getName().equals("artemis")) {
Map<String, Object> acceptorParams = acceptorConfiguration.getParams();
String scheme = ConfigurationHelper.getStringProperty(TransportConstants.SCHEME_PROP_NAME, SchemaConstants.TCP, acceptorParams);
String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, "localhost", acceptorParams);
int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, 61616, acceptorParams);
if (InetAddress.getByName(host).isAnyLocalAddress()) {
host = "localhost";
}
return new URI(scheme, null, host, port, null, null, null).toString();
}
}
} catch (Exception e) {
if (isVerbose()) {
System.out.print("Can not get the broker url instance: " + e.toString());
}
}
}
return null;
}
public String getBrokerEtc() {
if (brokerEtc == null) {
brokerEtc = System.getProperty("artemis.instance.etc");

View File

@ -21,28 +21,18 @@ import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import java.io.File;
import java.net.InetAddress;
import java.net.URI;
import java.util.Map;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.InputAbstract;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import org.apache.qpid.jms.JmsConnectionFactory;
public class ConnectionAbstract extends InputAbstract {
private static final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
@Option(name = "--url", description = "URL towards the broker. (default: tcp://localhost:61616)")
@Option(name = "--source-url", description = "URL towards the broker. (default: Read from current broker.xml or tcp://localhost:61616 if the default cannot be parsed)")
protected String brokerURL = DEFAULT_BROKER_URL;
@Option(name = "--user", description = "User used to connect")
@ -117,39 +107,6 @@ public class ConnectionAbstract extends InputAbstract {
return null;
}
private String getBrokerURLInstance() {
if (getBrokerInstance() != null) {
try {
FileConfiguration fileConfiguration = new FileConfiguration();
String brokerConfiguration = new File(new File(getBrokerEtc()), "broker.xml").toURI().toASCIIString();
FileDeploymentManager fileDeploymentManager = new FileDeploymentManager(brokerConfiguration);
fileDeploymentManager.addDeployable(fileConfiguration);
fileDeploymentManager.readConfiguration();
for (TransportConfiguration acceptorConfiguration: fileConfiguration.getAcceptorConfigurations()) {
if (acceptorConfiguration.getName().equals("artemis")) {
Map<String, Object> acceptorParams = acceptorConfiguration.getParams();
String scheme = ConfigurationHelper.getStringProperty(TransportConstants.SCHEME_PROP_NAME, SchemaConstants.TCP, acceptorParams);
String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, "localhost", acceptorParams);
int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, 61616, acceptorParams);
if (InetAddress.getByName(host).isAnyLocalAddress()) {
host = "localhost";
}
return new URI(scheme, null, host, port, null, null, null).toString();
}
}
} catch (Exception e) {
if (isVerbose()) {
System.out.print("Can not get the broker url instance: " + e.toString());
}
}
}
return null;
}
protected ConnectionFactory createConnectionFactory() throws Exception {
if (protocol.equals("core")) {
return createCoreConnectionFactory();

View File

@ -0,0 +1,361 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.InputAbstract;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.qpid.jms.JmsConnectionFactory;
@Command(name = "transfer", description = "Moves Messages from one destination towards another destination")
public class Transfer extends InputAbstract {
private static final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
@Option(name = "--source-url", description = "URL towards the broker. (default: Read from current broker.xml or tcp://localhost:61616 if the default cannot be parsed)")
protected String sourceURL = DEFAULT_BROKER_URL;
@Option(name = "--source-user", description = "User used to connect")
protected String sourceUser;
@Option(name = "--source-password", description = "Password used to connect")
protected String sourcePassword;
@Option(name = "--target-url", description = "URL towards the broker. (default: Read from current broker.xml or tcp://localhost:61616 if the default cannot be parsed)")
protected String targetURL = DEFAULT_BROKER_URL;
@Option(name = "--target-user", description = "User used to connect")
protected String targetUser;
@Option(name = "--target-password", description = "Password used to connect")
protected String targetPassword;
@Option(name = "--receive-timeout", description = "Amount of time (in milliseconds) to wait before giving up the loop. 0 means receiveNoWait, -1 means consumer.receive() waiting forever. (default=5000)")
int receiveTimeout = 5000;
@Option(name = "--source-client-id", description = "ClientID to be associated with connection")
String sourceClientID;
@Option(name = "--source-protocol", description = "Protocol used. Valid values are amqp or core. Default=core.")
String sourceProtocol = "core";
@Option(name = "--source-queue", description = "JMS Queue to be used.")
String sourceQueue;
@Option(name = "--shared-durable-subscription", description = "Name of a shared subscription name to be used on the input topic")
String sharedDurableSubscription;
@Option(name = "--shared-subscription", description = "Name of a shared subscription name to be used on the input topic")
String sharedSubscription;
@Option(name = "--durable-consumer", description = "Name of a durable consumer to be used on the input topic")
String durableConsumer;
@Option(name = "--no-Local", description = "Use noLocal when applicable on topic operation")
boolean noLocal;
@Option(name = "--source-topic", description = "Destination to be used. It can be prefixed with queue:// or topic:// and can be an FQQN in the form of <address>::<queue>. (Default: queue://TEST)")
String sourceTopic;
@Option(name = "--source-filter", description = "filter to be used with the consumer")
String filter;
@Option(name = "--target-protocol", description = "Protocol used. Valid values are amqp or core. Default=core.")
String targetProtocol = "core";
@Option(name = "--commit-interval", description = "Destination to be used. It can be prefixed with queue:// or topic:// and can be an FQQN in the form of <address>::<queue>. (Default: queue://TEST)")
int commitInterval = 1000;
@Option(name = "--copy", description = "If this option is chosen we will perform a copy of the queue by rolling back the original TX on the source.")
boolean copy;
@Option(name = "--target-queue", description = "JMS Queue to be used.")
String targetQueue;
@Option(name = "--target-topic", description = "Destination to be used. It can be prefixed with queue:// or topic:// and can be an FQQN in the form of <address>::<queue>. (Default: queue://TEST)")
String targetTopic;
boolean isCopy() {
return copy;
}
@SuppressWarnings("StringEquality")
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
// it is intentional to make a comparison on the String object here
// this is to test if the original option was switched or not.
// we don't care about being .equals at all.
// as a matter of fact if you pass brokerURL in a way it's equals to DEFAULT_BROKER_URL,
// we should not the broker URL Instance
// and still honor the one passed by parameter.
// SupressWarnings was added to this method to supress the false positive here from error-prone.
if (sourceURL == DEFAULT_BROKER_URL) {
String brokerURLInstance = getBrokerURLInstance();
if (brokerURLInstance != null) {
sourceURL = brokerURLInstance;
}
}
System.out.println("Connection brokerURL = " + sourceURL);
ConnectionFactory sourceConnectionFactory = createConnectionFactory("source", sourceProtocol, sourceURL, sourceUser, sourcePassword, sourceClientID);
Connection sourceConnection = sourceConnectionFactory.createConnection();
Session sourceSession = sourceConnection.createSession(Session.SESSION_TRANSACTED);
Destination sourceDestination = createDestination("source", sourceSession, sourceQueue, sourceTopic);
MessageConsumer consumer = null;
if (sourceDestination instanceof Queue) {
if (filter != null) {
consumer = sourceSession.createConsumer(sourceDestination, filter);
} else {
consumer = sourceSession.createConsumer(sourceDestination);
}
} else if (sourceDestination instanceof Topic) {
Topic topic = (Topic) sourceDestination;
if (durableConsumer != null) {
if (filter != null) {
consumer = sourceSession.createDurableConsumer(topic, durableConsumer);
} else {
consumer = sourceSession.createDurableConsumer(topic, durableConsumer, filter, noLocal);
}
} else if (sharedDurableSubscription != null) {
if (filter != null) {
consumer = sourceSession.createSharedDurableConsumer(topic, sharedDurableSubscription, filter);
} else {
consumer = sourceSession.createSharedDurableConsumer(topic, sharedDurableSubscription);
}
} else if (sharedSubscription != null) {
if (filter != null) {
consumer = sourceSession.createSharedConsumer(topic, sharedSubscription, filter);
} else {
consumer = sourceSession.createSharedConsumer(topic, sharedSubscription);
}
} else {
throw new IllegalArgumentException("you have to specify --durable-consumer, --shared-durable-subscription or --shared-subscription with a topic");
}
}
ConnectionFactory targetConnectionFactory = createConnectionFactory("target", targetProtocol, targetURL, targetUser, targetPassword, null);
Connection targetConnection = targetConnectionFactory.createConnection();
Session targetSession = targetConnection.createSession(Session.SESSION_TRANSACTED);
Destination targetDestination = createDestination("target", targetSession, targetQueue, targetTopic);
MessageProducer producer = targetSession.createProducer(targetDestination);
if (sourceURL.equals(targetURL) && sourceDestination.equals(targetDestination)) {
System.out.println("You cannot transfer between " + sourceURL + "/" + sourceDestination + " and " + targetURL + "/" + targetDestination + ".\n" + "That would create an infinite recursion.");
throw new IllegalArgumentException("cannot use " + sourceDestination + " == " + targetDestination);
}
sourceConnection.start();
int pending = 0, total = 0;
while (true) {
Message receivedMessage;
if (receiveTimeout < 0) {
receivedMessage = consumer.receive();
} else if (receiveTimeout == 0) {
receivedMessage = consumer.receiveNoWait();
} else {
receivedMessage = consumer.receive(receiveTimeout);
}
if (receivedMessage == null) {
if (isVerbose()) {
System.out.println("could not receive any more messages");
}
break;
}
producer.send(receivedMessage);
pending++;
total++;
if (isVerbose()) {
System.out.println("Received message " + total + " with " + pending + " messages pending to be commited");
}
if (pending > commitInterval) {
System.out.println("Transferred " + pending + " messages of " + total);
pending = 0;
targetSession.commit();
if (!isCopy()) {
sourceSession.commit();
}
}
}
System.out.println("Transferred a total of " + total + " messages");
if (pending != 0) {
targetSession.commit();
if (isCopy()) {
sourceSession.rollback();
} else {
sourceSession.commit();
}
}
sourceConnection.close();
targetConnection.close();
return null;
}
Destination createDestination(String role, Session session, String queue, String topic) throws Exception {
if (queue != null && topic != null) {
throw new IllegalArgumentException("Cannot have topic and queue passed as " + role);
}
if (queue != null) {
return session.createQueue(queue);
}
if (topic != null) {
return session.createTopic(topic);
}
throw new IllegalArgumentException("You need to pass either a topic or a queue as " + role);
}
protected ConnectionFactory createConnectionFactory(String role,
String protocol,
String brokerURL,
String user,
String password,
String clientID) throws Exception {
if (protocol.equals("core")) {
if (isVerbose()) {
System.out.println("Creating " + role + " CORE Connection towards " + brokerURL);
}
return createCoreConnectionFactory(brokerURL, user, password, clientID);
} else if (protocol.equals("amqp")) {
if (isVerbose()) {
System.out.println("Creating " + role + " AMQP Connection towards " + brokerURL);
}
return createAMQPConnectionFactory(brokerURL, user, password, clientID);
} else {
throw new IllegalStateException("protocol " + protocol + " not supported");
}
}
private ConnectionFactory createAMQPConnectionFactory(String brokerURL,
String user,
String password,
String clientID) {
if (brokerURL.startsWith("tcp://")) {
// replacing tcp:// by amqp://
brokerURL = "amqp" + brokerURL.substring(3);
}
JmsConnectionFactory cf = new JmsConnectionFactory(user, password, brokerURL);
if (clientID != null) {
cf.setClientID(clientID);
}
try {
Connection connection = cf.createConnection();
connection.close();
return cf;
} catch (JMSSecurityException e) {
// if a security exception will get the user and password through an input
context.err.println("Connection failed::" + e.getMessage());
userPassword(brokerURL);
cf = new JmsConnectionFactory(user, password, brokerURL);
if (clientID != null) {
cf.setClientID(clientID);
}
return cf;
} catch (JMSException e) {
// if a connection exception will ask for the URL, user and password
context.err.println("Connection failed::" + e.getMessage());
brokerURL = input("--url", "Type in the broker URL for a retry (e.g. tcp://localhost:61616)", brokerURL);
userPassword(brokerURL);
cf = new JmsConnectionFactory(user, password, brokerURL);
if (clientID != null) {
cf.setClientID(clientID);
}
return cf;
}
}
protected ActiveMQConnectionFactory createCoreConnectionFactory(String brokerURL,
String user,
String password,
String clientID) {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL, user, password);
if (clientID != null) {
System.out.println("Consumer:: clientID = " + clientID);
cf.setClientID(clientID);
}
try {
Connection connection = cf.createConnection();
connection.close();
return cf;
} catch (JMSSecurityException e) {
// if a security exception will get the user and password through an input
if (context != null) {
context.err.println("Connection failed::" + e.getMessage());
}
Pair<String, String> userPair = userPassword(brokerURL);
cf = new ActiveMQConnectionFactory(brokerURL, userPair.getA(), userPair.getB());
if (clientID != null) {
cf.setClientID(clientID);
}
return cf;
} catch (JMSException e) {
// if a connection exception will ask for the URL, user and password
if (context != null) {
context.err.println("Connection failed::" + e.getMessage());
}
brokerURL = input("--url", "Type in the broker URL for a retry (e.g. tcp://localhost:61616)", brokerURL);
Pair<String, String> userPair = userPassword(brokerURL);
cf = new ActiveMQConnectionFactory(brokerURL, userPair.getA(), userPair.getB());
if (clientID != null) {
cf.setClientID(clientID);
}
return cf;
}
}
Pair<String, String> userPassword(String uri) {
System.out.println("Type in user/password towards " + uri);
String user, password;
user = input("--user", "Type the username for a retry", null);
password = inputPassword("--password", "Type the password for a retry", null);
return new Pair<>(user, password);
}
}

View File

@ -23,6 +23,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession;
@ -71,16 +72,31 @@ public class ServerUtil {
private static Process internalStartServer(String artemisInstance,
String serverName) throws IOException, ClassNotFoundException {
return execute(artemisInstance, serverName, "run");
}
public static Process execute(String artemisInstance, String jobName, String...args) throws IOException, ClassNotFoundException {
try {
boolean IS_WINDOWS = System.getProperty("os.name").toLowerCase().trim().startsWith("win");
ArrayList<String> command = new ArrayList<>();
ProcessBuilder builder = null;
if (IS_WINDOWS) {
builder = new ProcessBuilder("cmd", "/c", "artemis.cmd", "run");
command.add("cmd");
command.add("/c");
command.add("artemis.cmd");
} else {
builder = new ProcessBuilder("./artemis", "run");
command.add("./artemis");
}
for (String arg: args) {
command.add(arg);
}
builder = new ProcessBuilder(command);
builder.directory(new File(artemisInstance + "/bin"));
final Process process = builder.start();
@ -91,12 +107,12 @@ public class ServerUtil {
}
});
ProcessLogger outputLogger = new ProcessLogger(true, process.getInputStream(), serverName, false);
ProcessLogger outputLogger = new ProcessLogger(true, process.getInputStream(), jobName, false);
outputLogger.start();
// Adding a reader to System.err, so the VM won't hang on a System.err.println as identified on this forum thread:
// http://www.jboss.org/index.html?module=bb&op=viewtopic&t=151815
ProcessLogger errorLogger = new ProcessLogger(true, process.getErrorStream(), serverName, true);
ProcessLogger errorLogger = new ProcessLogger(true, process.getErrorStream(), jobName, true);
errorLogger.start();
return process;
} catch (IOException e) {

View File

@ -608,6 +608,38 @@
</args>
</configuration>
</execution>
<!-- used on TransferTest -->
<execution>
<phase>test-compile</phase>
<id>create-transfer-1</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<allowAnonymous>true</allowAnonymous>
<user>admin</user>
<password>admin</password>
<noWeb>true</noWeb>
<instance>${basedir}/target/transfer1</instance>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-transfer-2</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<allowAnonymous>true</allowAnonymous>
<user>admin</user>
<password>admin</password>
<noWeb>true</noWeb>
<portOffset>100</portOffset>
<instance>${basedir}/target/transfer2</instance>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>

View File

@ -0,0 +1,332 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.transfer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestTransfer extends SmokeTestBase {
public static final String SERVER_NAME_0 = "transfer1";
public static final String SERVER_NAME_1 = "transfer2";
private static final int NUMBER_OF_MESSAGES = 200;
private static final int PARTIAL_MESSAGES = 10;
String sourceTransferProtocol = "amqp";
String targetTransferProtocol = "amqp";
String senderProtocol = "amqp";
String consumerProtocol = "amqp";
public TestTransfer(String sender, String consumer, String source, String target) {
this.senderProtocol = sender;
this.consumerProtocol = consumer;
this.sourceTransferProtocol = source;
this.targetTransferProtocol = target;
}
@Parameterized.Parameters(name = "sender={0}, consumer={1}, sourceOnTransfer={2}, targetOnTransfer={3}")
public static Collection<Object[]> getParams() {
String[] protocols = new String[]{"core", "amqp"};
ArrayList<Object[]> parameters = new ArrayList<>();
for (int i = 0; i < protocols.length; i++) {
for (int j = 0; j < protocols.length; j++) {
// sender and sourceOnTransfer have to be the same
// consumer and targetOnTransfer have to be the same
// this is because AMQP Shared Subscription will create a different queue than core
String[] parameter = new String[]{protocols[i], protocols[j], protocols[i], protocols[j]};
parameters.add(parameter);
}
}
return parameters;
}
private static ConnectionFactory createConnectionFactory(String protocol, String uri) {
if (protocol.toUpperCase().equals("AMQP")) {
if (uri.startsWith("tcp://")) {
// replacing tcp:// by amqp://
uri = "amqp" + uri.substring(3);
}
return new JmsConnectionFactory(uri);
} else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) {
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(uri);
} else {
throw new IllegalStateException("Unkown:" + protocol);
}
}
private ConnectionFactory createConsumerCF() {
return createConnectionFactory(consumerProtocol, "tcp://localhost:61716");
}
private ConnectionFactory createSenderCF() {
return createConnectionFactory(senderProtocol, "tcp://localhost:61616");
}
@Before
public void before() throws Exception {
cleanupData(SERVER_NAME_0);
cleanupData(SERVER_NAME_1);
disableCheckThread();
startServer(SERVER_NAME_0, 0, 30000);
startServer(SERVER_NAME_1, 100, 30000);
}
@Test
public void testTransferSimpleQueueCopy() throws Exception {
internalTransferSimpleQueue(false);
}
@Test
public void testTransferSimpleQueue() throws Exception {
internalTransferSimpleQueue(true);
}
public String getQueueName() {
return getName();
}
public String getTopicName() {
return "Topic" + getName();
}
private void internalTransferSimpleQueue(boolean copy) throws Exception {
ConnectionFactory factory = createSenderCF();
Connection connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage("hello " + i));
}
session.commit();
String[] argsArray = new String[]{"transfer", "--target-url", "tcp://localhost:61716", "--source-queue", getQueueName(), "--target-queue", getQueueName(), "--source-protocol", sourceTransferProtocol, "--target-protocol", targetTransferProtocol, "--receive-timeout", "0"};
if (copy) {
ArrayList<String> copyArgs = new ArrayList<>();
for (String a : argsArray) {
copyArgs.add(a);
}
if (copy) {
copyArgs.add("--copy");
}
argsArray = copyArgs.toArray(new String[copyArgs.size()]);
}
Process transferProcess = ServerUtil.execute(getServerLocation(SERVER_NAME_0), "transfer", argsArray);
transferProcess.waitFor();
ConnectionFactory factoryTarget = createConsumerCF();
Connection connectionTarget = factoryTarget.createConnection();
connectionTarget.start();
Session sessionTarget = connectionTarget.createSession(true, Session.SESSION_TRANSACTED);
Queue queueTarget = sessionTarget.createQueue(getQueueName());
MessageConsumer consumer = sessionTarget.createConsumer(queueTarget);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage received = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(received);
Assert.assertEquals("hello " + i, received.getText());
}
sessionTarget.commit();
Assert.assertNull(consumer.receiveNoWait());
MessageConsumer consumerSource = session.createConsumer(queue);
connection.start();
if (copy) {
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage received = (TextMessage) consumerSource.receive(1000);
Assert.assertNotNull(received);
Assert.assertEquals("hello " + i, received.getText());
}
}
Assert.assertNull(consumerSource.receiveNoWait());
connection.close();
connectionTarget.close();
}
@Test
public void testDurableSharedSubscrition() throws Exception {
ConnectionFactory factory = createSenderCF();
Connection connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(getTopicName());
MessageConsumer subscription = session.createSharedDurableConsumer(topic, "testSubs");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage("hello " + i));
}
session.commit();
subscription.close();
Process transferProcess = ServerUtil.execute(getServerLocation(SERVER_NAME_0), "transfer", "transfer", "--target-url", "tcp://localhost:61716", "--source-topic", getTopicName(), "--shared-durable-subscription", "testSubs", "--target-queue", getQueueName(), "--source-protocol", sourceTransferProtocol, "--target-protocol", targetTransferProtocol, "--receive-timeout", "1000", "--verbose");
transferProcess.waitFor();
ConnectionFactory factoryTarget = createConsumerCF();
Connection connectionTarget = factoryTarget.createConnection();
connectionTarget.start();
Session sessionTarget = connectionTarget.createSession(true, Session.SESSION_TRANSACTED);
Queue queueTarget = sessionTarget.createQueue(getQueueName());
MessageConsumer consumer = sessionTarget.createConsumer(queueTarget);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage received = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(received);
}
sessionTarget.commit();
Assert.assertNull(consumer.receiveNoWait());
connection.close();
connectionTarget.close();
}
@Test
public void testSharedSubscrition() throws Exception {
ConnectionFactory factory = createSenderCF();
Connection connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(getTopicName());
MessageConsumer subscription = session.createSharedConsumer(topic, "testSubs");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// I can't just send a few messages.. I have to send more messages that would be stuck on delivering mode
// since the transfer will share the consumer with the shared consumer
// and the temporary queue would be removed if i closed it earlier.
for (int i = 0; i < 2000; i++) {
producer.send(session.createTextMessage("hello " + i));
}
session.commit();
Process transferProcess = ServerUtil.execute(getServerLocation(SERVER_NAME_0), "transfer", "transfer", "--target-url", "tcp://localhost:61716", "--source-topic", getTopicName(), "--shared-subscription", "testSubs", "--target-queue", getQueueName(), "--source-protocol", sourceTransferProtocol, "--target-protocol", targetTransferProtocol, "--receive-timeout", "0", "--verbose");
transferProcess.waitFor();
// this test is a bit tricky as the subscription would be removed when the consumer is gone...
// I'm adding a test for completion only
// and the subscription has to be closed only after the transfer,
// which will not receive all the messages as some messages will be in delivering mode
subscription.close();
ConnectionFactory factoryTarget = createConsumerCF();
Connection connectionTarget = factoryTarget.createConnection();
connectionTarget.start();
Session sessionTarget = connectionTarget.createSession(true, Session.SESSION_TRANSACTED);
Queue queueTarget = sessionTarget.createQueue(getQueueName());
MessageConsumer consumer = sessionTarget.createConsumer(queueTarget);
// we are keeping a non durable subscription so the temporary queue still up
// I'm not going to bother about being too strict about the content, just that some messages arrived
for (int i = 0; i < PARTIAL_MESSAGES; i++) {
TextMessage received = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(received);
}
sessionTarget.commit();
connection.close();
connectionTarget.close();
}
@Test
public void testDurableConsumer() throws Exception {
ConnectionFactory factory = createSenderCF();
Connection connection = factory.createConnection();
connection.setClientID("test");
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(getTopicName());
MessageConsumer subscription = session.createDurableConsumer(topic, "testSubs");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage("hello " + i));
}
session.commit();
connection.close();
subscription.close();
Process transferProcess = ServerUtil.execute(getServerLocation(SERVER_NAME_0), "transfer", "transfer", "--target-url", "tcp://localhost:61716", "--source-topic", getTopicName(), "--source-client-id", "test", "--durable-consumer", "testSubs", "--target-queue", getQueueName(), "--source-protocol", sourceTransferProtocol, "--target-protocol", targetTransferProtocol, "--receive-timeout", "1000", "--verbose", "--silent");
transferProcess.waitFor();
ConnectionFactory factoryTarget = createConsumerCF();
Connection connectionTarget = factoryTarget.createConnection();
connectionTarget.start();
Session sessionTarget = connectionTarget.createSession(true, Session.SESSION_TRANSACTED);
Queue queueTarget = sessionTarget.createQueue(getQueueName());
MessageConsumer consumer = sessionTarget.createConsumer(queueTarget);
// we are keeping a non durable subscription so the temporary queue still up
// I'm not going to bother about being too strict about the content, just that some messages arrived
for (int i = 0; i < PARTIAL_MESSAGES; i++) {
TextMessage received = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(received);
}
sessionTarget.commit();
connectionTarget.close();
}
}