diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java index 61c6d6bd61..24e56b2e58 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java @@ -30,6 +30,14 @@ public final class XmlDataConstants { static final String DOCUMENT_PARENT = "activemq-journal"; static final String BINDINGS_PARENT = "bindings"; + // used on importing data from 1.x + static final String OLD_BINDING = "binding"; + static final String OLD_ADDRESS = "address"; + static final String OLD_FILTER = "filter-string"; + static final String OLD_QUEUE = "queue-name"; + + + static final String QUEUE_BINDINGS_CHILD = "queue-binding"; static final String QUEUE_BINDING_ADDRESS = "address"; static final String QUEUE_BINDING_FILTER_STRING = "filter-string"; diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java index a824177c81..b68479994d 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java @@ -37,10 +37,13 @@ import java.nio.ByteBuffer; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import java.util.UUID; import io.airlift.airline.Command; @@ -61,6 +64,8 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.cli.commands.ActionAbstract; import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.core.filter.impl.FilterImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -93,6 +98,8 @@ public final class XmlDataImporter extends ActionAbstract { String tempFileName = ""; + HashMap oldPrefixTranslation = new HashMap<>(); + private ClientSession session; @Option(name = "--host", description = "The host used to import the data (default localhost)") @@ -113,6 +120,14 @@ public final class XmlDataImporter extends ActionAbstract { @Option(name = "--input", description = "The input file name (default=exp.dmp)", required = true) public String input = "exp.dmp"; + @Option(name = "--sort", description = "Sort the messages from the input (used for older versions that won't sort messages)") + public boolean sort = false; + + @Option(name = "--legacy-prefixes", description = "Do not remove prefixes from legacy imports") + public boolean legacyPrefixes = false; + + TreeSet messages; + public String getPassword() { return password; } @@ -224,13 +239,29 @@ public final class XmlDataImporter extends ActionAbstract { } private void processXml() throws Exception { + if (sort) { + messages = new TreeSet(new Comparator() { + @Override + public int compare(MessageTemp o1, MessageTemp o2) { + if (o1.id == o2.id) { + return 0; + } else if (o1.id > o2.id) { + return 1; + } else { + return -1; + } + } + }); + } try { while (reader.hasNext()) { if (logger.isDebugEnabled()) { logger.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] "); } if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) { - if (XmlDataConstants.QUEUE_BINDINGS_CHILD.equals(reader.getLocalName())) { + if (XmlDataConstants.OLD_BINDING.equals(reader.getLocalName())) { + oldBinding(); // export from 1.x + } else if (XmlDataConstants.QUEUE_BINDINGS_CHILD.equals(reader.getLocalName())) { bindQueue(); } else if (XmlDataConstants.ADDRESS_BINDINGS_CHILD.equals(reader.getLocalName())) { bindAddress(); @@ -241,6 +272,12 @@ public final class XmlDataImporter extends ActionAbstract { reader.next(); } + if (sort) { + for (MessageTemp msgtmp : messages) { + sendMessage(msgtmp.queues, msgtmp.message); + } + } + if (!session.isAutoCommitSends()) { session.commit(); } @@ -258,6 +295,7 @@ public final class XmlDataImporter extends ActionAbstract { Byte priority = 0; Long expiration = 0L; Long timestamp = 0L; + Long id = 0L; org.apache.activemq.artemis.utils.UUID userId = null; ArrayList queues = new ArrayList<>(); @@ -280,6 +318,9 @@ public final class XmlDataImporter extends ActionAbstract { case XmlDataConstants.MESSAGE_USER_ID: userId = UUIDGenerator.getInstance().generateUUID(); break; + case XmlDataConstants.MESSAGE_ID: + id = Long.parseLong(reader.getAttributeValue(i)); + break; } } @@ -313,7 +354,25 @@ public final class XmlDataImporter extends ActionAbstract { reader.next(); } - sendMessage(queues, message); + if (sort) { + messages.add(new MessageTemp(id, queues, message)); + } else { + sendMessage(queues, message); + } + } + + + class MessageTemp { + long id; + List queues; + Message message; + + MessageTemp(long id, List queues, Message message) { + this.message = message; + this.queues = queues; + this.message = message; + this.id = id; + } } private Byte getMessageType(String value) { @@ -341,7 +400,7 @@ public final class XmlDataImporter extends ActionAbstract { return type; } - private void sendMessage(ArrayList queues, Message message) throws Exception { + private void sendMessage(List queues, Message message) throws Exception { StringBuilder logMessage = new StringBuilder(); String destination = addressMap.get(queues.get(0)); @@ -400,11 +459,21 @@ public final class XmlDataImporter extends ActionAbstract { private void processMessageQueues(ArrayList queues) { for (int i = 0; i < reader.getAttributeCount(); i++) { if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i))) { - queues.add(reader.getAttributeValue(i)); + String queueName = reader.getAttributeValue(i); + String translation = checkPrefix(queueName); + queues.add(translation); } } } + private String checkPrefix(String queueName) { + String newQueueName = oldPrefixTranslation.get(queueName); + if (newQueueName == null) { + newQueueName = queueName; + } + return newQueueName; + } + private void processMessageProperties(Message message) { String key = ""; String value = ""; @@ -530,6 +599,88 @@ public final class XmlDataImporter extends ActionAbstract { } } + + private void oldBinding() throws Exception { + String queueName = ""; + String address = ""; + String filter = ""; + + for (int i = 0; i < reader.getAttributeCount(); i++) { + String attributeName = reader.getAttributeLocalName(i); + switch (attributeName) { + case XmlDataConstants.OLD_ADDRESS: + address = reader.getAttributeValue(i); + break; + case XmlDataConstants.OLD_QUEUE: + queueName = reader.getAttributeValue(i); + break; + case XmlDataConstants.OLD_FILTER: + filter = reader.getAttributeValue(i); + break; + } + } + + if (queueName == null || address == null || filter == null) { + // not expected to happen unless someone manually changed the format + throw new IllegalStateException("invalid format, missing queue, address or filter"); + } + + RoutingType routingType = RoutingType.MULTICAST; + + if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) { + routingType = RoutingType.ANYCAST; + if (!legacyPrefixes) { + String newaddress = address.substring(PacketImpl.OLD_QUEUE_PREFIX.length()); + address = newaddress; + } + } else if (address.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) { + routingType = RoutingType.MULTICAST; + if (!legacyPrefixes) { + String newaddress = address.substring(PacketImpl.OLD_TOPIC_PREFIX.length()); + address = newaddress; + } + } + + if (queueName.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) { + if (!legacyPrefixes) { + String newQueueName = queueName.substring(PacketImpl.OLD_QUEUE_PREFIX.length()); + oldPrefixTranslation.put(queueName, newQueueName); + queueName = newQueueName; + } + } else if (queueName.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) { + if (!legacyPrefixes) { + String newQueueName = queueName.substring(PacketImpl.OLD_TOPIC_PREFIX.length()); + oldPrefixTranslation.put(queueName, newQueueName); + queueName = newQueueName; + } + } + + + ClientSession.AddressQuery addressQuery = session.addressQuery(SimpleString.toSimpleString(address)); + + if (!addressQuery.isExists()) { + session.createAddress(SimpleString.toSimpleString(address), routingType, true); + } + + if (!filter.equals(FilterImpl.GENERIC_IGNORED_FILTER)) { + ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName)); + + if (!queueQuery.isExists()) { + session.createQueue(address, routingType, queueName, filter, true); + if (logger.isDebugEnabled()) { + logger.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")"); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug("Binding " + queueName + " already exists so won't re-bind."); + } + } + } + + addressMap.put(queueName, address); + } + + private void bindQueue() throws Exception { String queueName = ""; String address = ""; diff --git a/tests/compatibility-tests/src/main/resources/exportimport/export.groovy b/tests/compatibility-tests/src/main/resources/exportimport/export.groovy index 0b6fb73fc6..d7781a1efd 100644 --- a/tests/compatibility-tests/src/main/resources/exportimport/export.groovy +++ b/tests/compatibility-tests/src/main/resources/exportimport/export.groovy @@ -26,9 +26,9 @@ XmlDataExporter exporter = new XmlDataExporter(); exporter.binding = arg[0] + "/sender/data/bindings" exporter.journal = arg[0] + "/sender/data/journal" try { - exporter.largeMessages = arg[0] + "/sender/data/largeMessages" + exporter.largeMessages = arg[0] + "/sender/data/largemessages" } catch (Throwable e) { - exporter.largeMessges = arg[0] + "/sender/data/largeMessages" + exporter.largeMessges = arg[0] + "/sender/data/largemessages" } exporter.paging = arg[0] + "/sender/data/paging" diff --git a/tests/compatibility-tests/src/main/resources/exportimport/export1X.groovy b/tests/compatibility-tests/src/main/resources/exportimport/export1X.groovy index 7667533763..f7ea1f278c 100644 --- a/tests/compatibility-tests/src/main/resources/exportimport/export1X.groovy +++ b/tests/compatibility-tests/src/main/resources/exportimport/export1X.groovy @@ -25,9 +25,9 @@ XmlDataExporter exporter = new XmlDataExporter(); exporter.binding = arg[0] + "/sender/data/bindings" exporter.journal = arg[0] + "/sender/data/journal" try { - exporter.largeMessages = arg[0] + "/sender/data/largeMessages" + exporter.largeMessages = arg[0] + "/sender/data/largemessages" } catch (Throwable e) { - exporter.largeMessges = arg[0] + "/sender/data/largeMessages" + exporter.largeMessges = arg[0] + "/sender/data/largemessages" } exporter.paging = arg[0] + "/sender/data/paging" diff --git a/tests/compatibility-tests/src/main/resources/exportimport/import.groovy b/tests/compatibility-tests/src/main/resources/exportimport/import.groovy index 6b9cf9b1f0..d3be0e3e93 100644 --- a/tests/compatibility-tests/src/main/resources/exportimport/import.groovy +++ b/tests/compatibility-tests/src/main/resources/exportimport/import.groovy @@ -22,6 +22,8 @@ System.out.println("Arg::" + arg[0]); File pagingfile = new File(arg[0] + "/sender/data/paging") pagingfile.mkdirs() XmlDataImporter importer = new XmlDataImporter(); +importer.legacyPrefixes = legacy +importer.sort = sort; importer.input = arg[0] + "/journal.export" importer.execute(new ActionContext(System.in, System.out, System.err)) diff --git a/tests/compatibility-tests/src/main/resources/exportimport/import1X.groovy b/tests/compatibility-tests/src/main/resources/exportimport/import1X.groovy deleted file mode 100644 index ceccb0308a..0000000000 --- a/tests/compatibility-tests/src/main/resources/exportimport/import1X.groovy +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.apache.activemq.artemis.cli.commands.ActionContext -import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter - - -System.out.println("Arg::" + arg[0]); -File pagingfile = new File(arg[0] + "/sender/data/paging") -pagingfile.mkdirs() -XmlDataImporter importer = new XmlDataImporter(); - -importer.input = arg[0] + "/journal.export" -importer.execute(new ActionContext(System.in, System.out, System.err)) diff --git a/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy b/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy index 71d33ef68c..2e0e8a8b09 100644 --- a/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy +++ b/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy @@ -27,8 +27,20 @@ String clientType = arg[1]; String operation = arg[2]; -String queueName = "queue"; -String topicName = "topic"; +try { + legacyOption = legacy; +} catch (Throwable e) { + legacyOption = false; +} + + +if (legacyOption) { + queueName = "jms.queue.queue" + topicName = "jms.topic.topic" +} else { + queueName = "queue"; + topicName = "topic"; +} int LARGE_MESSAGE_SIZE = 10 * 1024; diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/ExportImportTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/ExportImportTest.java index 44f9011ea2..d40eb30c98 100644 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/ExportImportTest.java +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/ExportImportTest.java @@ -24,14 +24,12 @@ import java.util.List; import org.apache.activemq.artemis.utils.FileUtil; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE; import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; -import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR; /** * To run this test on the IDE and debug it, run the compatibility-tests through a command line once: @@ -51,7 +49,7 @@ public class ExportImportTest extends VersionedBaseTest { // this will ensure that all tests in this class are run twice, // once with "true" passed to the class' constructor and once with "false" - @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") + @Parameterized.Parameters(name = "server={0}, sender={1}, consumer={2}") public static Collection getParameters() { // we don't need every single version ever released.. // if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time @@ -64,7 +62,8 @@ public class ExportImportTest extends VersionedBaseTest { // combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE}); // combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE}); - combinations.addAll(combinatory(new Object[]{null}, new Object[]{ONE_FIVE, TWO_FOUR, SNAPSHOT}, new Object[]{ONE_FIVE, TWO_FOUR, SNAPSHOT})); + combinations.add(new Object[]{null, ONE_FIVE, SNAPSHOT}); + combinations.add(new Object[]{null, SNAPSHOT, SNAPSHOT}); return combinations; } @@ -83,17 +82,31 @@ public class ExportImportTest extends VersionedBaseTest { try { stopServer(serverClassloader); } catch (Throwable ignored) { + ignored.printStackTrace(); } try { stopServer(receiverClassloader); } catch (Throwable ignored) { + ignored.printStackTrace(); } } @Test - @Ignore // There's some work to be done on exporter / importer, but I wanted to send it in already public void testSendReceive() throws Throwable { - setVariable(senderClassloader, "persistent", Boolean.TRUE); + internalSendReceive(false); + } + + @Test + public void testSendReceivelegacy() throws Throwable { + if (!sender.equals(SNAPSHOT)) { + // makes no sense on snapshot + internalSendReceive(true); + } + } + + public void internalSendReceive(boolean legacyPrefixes) throws Throwable { + setVariable(senderClassloader, "legacy", false); + setVariable(senderClassloader, "persistent", true); startServer(serverFolder.getRoot(), senderClassloader, "sender"); callScript(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages"); stopServer(senderClassloader); @@ -104,17 +117,20 @@ public class ExportImportTest extends VersionedBaseTest { callScript(senderClassloader, "exportimport/export.groovy", serverFolder.getRoot().getAbsolutePath()); } - setVariable(receiverClassloader, "persistent", Boolean.TRUE); - startServer(serverFolder.getRoot(), receiverClassloader, "receiver"); + setVariable(receiverClassloader, "legacy", legacyPrefixes); + try { + setVariable(receiverClassloader, "persistent", true); + startServer(serverFolder.getRoot(), receiverClassloader, "receiver"); + + setVariable(receiverClassloader, "sort", sender.startsWith("ARTEMIS-1")); - if (receiver.startsWith("ARTEMIS-1")) { - callScript(receiverClassloader, "exportimport/import1X.groovy", serverFolder.getRoot().getAbsolutePath()); - } else { callScript(receiverClassloader, "exportimport/import.groovy", serverFolder.getRoot().getAbsolutePath()); - } - setVariable(receiverClassloader, "latch", null); - callScript(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages"); + setVariable(receiverClassloader, "latch", null); + callScript(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages"); + } finally { + setVariable(receiverClassloader, "legacy", false); + } } } diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java new file mode 100644 index 0000000000..8849c46273 --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JournalCompatibilityTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.activemq.artemis.utils.FileUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR; + +/** + * To run this test on the IDE and debug it, run the compatibility-tests through a command line once: + * + * cd /compatibility-tests + * mvn install -Ptests | tee output.log + * + * on the output.log you will see the output generated by {@link #getClasspathProperty(String)} + * + * On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test. + * On Idea you would do the following: + * + * Run->Edit Configuration->Add ArtemisMeshTest and add your properties. + */ +@RunWith(Parameterized.class) +public class JournalCompatibilityTest extends VersionedBaseTest { + + // this will ensure that all tests in this class are run twice, + // once with "true" passed to the class' constructor and once with "false" + @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") + public static Collection getParameters() { + // we don't need every single version ever released.. + // if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time + List combinations = new ArrayList<>(); + + /* + // during development sometimes is useful to comment out the combinations + // and add the ones you are interested.. example: + */ + // combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE}); + // combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE}); + + combinations.add(new Object[]{null, TWO_FOUR, SNAPSHOT}); + // the purpose on this one is just to validate the test itself. + /// if it can't run against itself it won't work at all + combinations.add(new Object[]{null, SNAPSHOT, SNAPSHOT}); + return combinations; + } + + public JournalCompatibilityTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + @Before + public void removeFolder() throws Throwable { + FileUtil.deleteDirectory(serverFolder.getRoot()); + serverFolder.getRoot().mkdirs(); + } + + @After + public void tearDown() { + try { + stopServer(serverClassloader); + } catch (Throwable ignored) { + } + try { + stopServer(receiverClassloader); + } catch (Throwable ignored) { + } + } + + @Test + public void testSendReceive() throws Throwable { + setVariable(senderClassloader, "persistent", true); + startServer(serverFolder.getRoot(), senderClassloader, "journalTest"); + callScript(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages"); + stopServer(senderClassloader); + + setVariable(receiverClassloader, "persistent", true); + startServer(serverFolder.getRoot(), receiverClassloader, "journalTest"); + + setVariable(receiverClassloader, "latch", null); + callScript(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages"); + } + +} +