ARTEMIS-1599 fixing compatibility with 1.x exported file

This commit is contained in:
Clebert Suconic 2018-01-10 18:59:58 -05:00
parent 3e41aa151d
commit da164a2074
9 changed files with 322 additions and 52 deletions

View File

@ -30,6 +30,14 @@ public final class XmlDataConstants {
static final String DOCUMENT_PARENT = "activemq-journal";
static final String BINDINGS_PARENT = "bindings";
// used on importing data from 1.x
static final String OLD_BINDING = "binding";
static final String OLD_ADDRESS = "address";
static final String OLD_FILTER = "filter-string";
static final String OLD_QUEUE = "queue-name";
static final String QUEUE_BINDINGS_CHILD = "queue-binding";
static final String QUEUE_BINDING_ADDRESS = "address";
static final String QUEUE_BINDING_FILTER_STRING = "filter-string";

View File

@ -37,10 +37,13 @@ import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import io.airlift.airline.Command;
@ -61,6 +64,8 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.cli.commands.ActionAbstract;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -93,6 +98,8 @@ public final class XmlDataImporter extends ActionAbstract {
String tempFileName = "";
HashMap<String, String> oldPrefixTranslation = new HashMap<>();
private ClientSession session;
@Option(name = "--host", description = "The host used to import the data (default localhost)")
@ -113,6 +120,14 @@ public final class XmlDataImporter extends ActionAbstract {
@Option(name = "--input", description = "The input file name (default=exp.dmp)", required = true)
public String input = "exp.dmp";
@Option(name = "--sort", description = "Sort the messages from the input (used for older versions that won't sort messages)")
public boolean sort = false;
@Option(name = "--legacy-prefixes", description = "Do not remove prefixes from legacy imports")
public boolean legacyPrefixes = false;
TreeSet<MessageTemp> messages;
public String getPassword() {
return password;
}
@ -224,13 +239,29 @@ public final class XmlDataImporter extends ActionAbstract {
}
private void processXml() throws Exception {
if (sort) {
messages = new TreeSet<MessageTemp>(new Comparator<MessageTemp>() {
@Override
public int compare(MessageTemp o1, MessageTemp o2) {
if (o1.id == o2.id) {
return 0;
} else if (o1.id > o2.id) {
return 1;
} else {
return -1;
}
}
});
}
try {
while (reader.hasNext()) {
if (logger.isDebugEnabled()) {
logger.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
}
if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) {
if (XmlDataConstants.QUEUE_BINDINGS_CHILD.equals(reader.getLocalName())) {
if (XmlDataConstants.OLD_BINDING.equals(reader.getLocalName())) {
oldBinding(); // export from 1.x
} else if (XmlDataConstants.QUEUE_BINDINGS_CHILD.equals(reader.getLocalName())) {
bindQueue();
} else if (XmlDataConstants.ADDRESS_BINDINGS_CHILD.equals(reader.getLocalName())) {
bindAddress();
@ -241,6 +272,12 @@ public final class XmlDataImporter extends ActionAbstract {
reader.next();
}
if (sort) {
for (MessageTemp msgtmp : messages) {
sendMessage(msgtmp.queues, msgtmp.message);
}
}
if (!session.isAutoCommitSends()) {
session.commit();
}
@ -258,6 +295,7 @@ public final class XmlDataImporter extends ActionAbstract {
Byte priority = 0;
Long expiration = 0L;
Long timestamp = 0L;
Long id = 0L;
org.apache.activemq.artemis.utils.UUID userId = null;
ArrayList<String> queues = new ArrayList<>();
@ -280,6 +318,9 @@ public final class XmlDataImporter extends ActionAbstract {
case XmlDataConstants.MESSAGE_USER_ID:
userId = UUIDGenerator.getInstance().generateUUID();
break;
case XmlDataConstants.MESSAGE_ID:
id = Long.parseLong(reader.getAttributeValue(i));
break;
}
}
@ -313,7 +354,25 @@ public final class XmlDataImporter extends ActionAbstract {
reader.next();
}
sendMessage(queues, message);
if (sort) {
messages.add(new MessageTemp(id, queues, message));
} else {
sendMessage(queues, message);
}
}
class MessageTemp {
long id;
List<String> queues;
Message message;
MessageTemp(long id, List<String> queues, Message message) {
this.message = message;
this.queues = queues;
this.message = message;
this.id = id;
}
}
private Byte getMessageType(String value) {
@ -341,7 +400,7 @@ public final class XmlDataImporter extends ActionAbstract {
return type;
}
private void sendMessage(ArrayList<String> queues, Message message) throws Exception {
private void sendMessage(List<String> queues, Message message) throws Exception {
StringBuilder logMessage = new StringBuilder();
String destination = addressMap.get(queues.get(0));
@ -400,11 +459,21 @@ public final class XmlDataImporter extends ActionAbstract {
private void processMessageQueues(ArrayList<String> queues) {
for (int i = 0; i < reader.getAttributeCount(); i++) {
if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i))) {
queues.add(reader.getAttributeValue(i));
String queueName = reader.getAttributeValue(i);
String translation = checkPrefix(queueName);
queues.add(translation);
}
}
}
private String checkPrefix(String queueName) {
String newQueueName = oldPrefixTranslation.get(queueName);
if (newQueueName == null) {
newQueueName = queueName;
}
return newQueueName;
}
private void processMessageProperties(Message message) {
String key = "";
String value = "";
@ -530,6 +599,88 @@ public final class XmlDataImporter extends ActionAbstract {
}
}
private void oldBinding() throws Exception {
String queueName = "";
String address = "";
String filter = "";
for (int i = 0; i < reader.getAttributeCount(); i++) {
String attributeName = reader.getAttributeLocalName(i);
switch (attributeName) {
case XmlDataConstants.OLD_ADDRESS:
address = reader.getAttributeValue(i);
break;
case XmlDataConstants.OLD_QUEUE:
queueName = reader.getAttributeValue(i);
break;
case XmlDataConstants.OLD_FILTER:
filter = reader.getAttributeValue(i);
break;
}
}
if (queueName == null || address == null || filter == null) {
// not expected to happen unless someone manually changed the format
throw new IllegalStateException("invalid format, missing queue, address or filter");
}
RoutingType routingType = RoutingType.MULTICAST;
if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) {
routingType = RoutingType.ANYCAST;
if (!legacyPrefixes) {
String newaddress = address.substring(PacketImpl.OLD_QUEUE_PREFIX.length());
address = newaddress;
}
} else if (address.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) {
routingType = RoutingType.MULTICAST;
if (!legacyPrefixes) {
String newaddress = address.substring(PacketImpl.OLD_TOPIC_PREFIX.length());
address = newaddress;
}
}
if (queueName.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) {
if (!legacyPrefixes) {
String newQueueName = queueName.substring(PacketImpl.OLD_QUEUE_PREFIX.length());
oldPrefixTranslation.put(queueName, newQueueName);
queueName = newQueueName;
}
} else if (queueName.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) {
if (!legacyPrefixes) {
String newQueueName = queueName.substring(PacketImpl.OLD_TOPIC_PREFIX.length());
oldPrefixTranslation.put(queueName, newQueueName);
queueName = newQueueName;
}
}
ClientSession.AddressQuery addressQuery = session.addressQuery(SimpleString.toSimpleString(address));
if (!addressQuery.isExists()) {
session.createAddress(SimpleString.toSimpleString(address), routingType, true);
}
if (!filter.equals(FilterImpl.GENERIC_IGNORED_FILTER)) {
ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));
if (!queueQuery.isExists()) {
session.createQueue(address, routingType, queueName, filter, true);
if (logger.isDebugEnabled()) {
logger.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Binding " + queueName + " already exists so won't re-bind.");
}
}
}
addressMap.put(queueName, address);
}
private void bindQueue() throws Exception {
String queueName = "";
String address = "";

View File

@ -26,9 +26,9 @@ XmlDataExporter exporter = new XmlDataExporter();
exporter.binding = arg[0] + "/sender/data/bindings"
exporter.journal = arg[0] + "/sender/data/journal"
try {
exporter.largeMessages = arg[0] + "/sender/data/largeMessages"
exporter.largeMessages = arg[0] + "/sender/data/largemessages"
} catch (Throwable e) {
exporter.largeMessges = arg[0] + "/sender/data/largeMessages"
exporter.largeMessges = arg[0] + "/sender/data/largemessages"
}
exporter.paging = arg[0] + "/sender/data/paging"

View File

@ -25,9 +25,9 @@ XmlDataExporter exporter = new XmlDataExporter();
exporter.binding = arg[0] + "/sender/data/bindings"
exporter.journal = arg[0] + "/sender/data/journal"
try {
exporter.largeMessages = arg[0] + "/sender/data/largeMessages"
exporter.largeMessages = arg[0] + "/sender/data/largemessages"
} catch (Throwable e) {
exporter.largeMessges = arg[0] + "/sender/data/largeMessages"
exporter.largeMessges = arg[0] + "/sender/data/largemessages"
}
exporter.paging = arg[0] + "/sender/data/paging"

View File

@ -22,6 +22,8 @@ System.out.println("Arg::" + arg[0]);
File pagingfile = new File(arg[0] + "/sender/data/paging")
pagingfile.mkdirs()
XmlDataImporter importer = new XmlDataImporter();
importer.legacyPrefixes = legacy
importer.sort = sort;
importer.input = arg[0] + "/journal.export"
importer.execute(new ActionContext(System.in, System.out, System.err))

View File

@ -1,28 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.activemq.artemis.cli.commands.ActionContext
import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter
System.out.println("Arg::" + arg[0]);
File pagingfile = new File(arg[0] + "/sender/data/paging")
pagingfile.mkdirs()
XmlDataImporter importer = new XmlDataImporter();
importer.input = arg[0] + "/journal.export"
importer.execute(new ActionContext(System.in, System.out, System.err))

View File

@ -27,8 +27,20 @@ String clientType = arg[1];
String operation = arg[2];
String queueName = "queue";
String topicName = "topic";
try {
legacyOption = legacy;
} catch (Throwable e) {
legacyOption = false;
}
if (legacyOption) {
queueName = "jms.queue.queue"
topicName = "jms.topic.topic"
} else {
queueName = "queue";
topicName = "topic";
}
int LARGE_MESSAGE_SIZE = 10 * 1024;

View File

@ -24,14 +24,12 @@ import java.util.List;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
/**
* To run this test on the IDE and debug it, run the compatibility-tests through a command line once:
@ -51,7 +49,7 @@ public class ExportImportTest extends VersionedBaseTest {
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
@Parameterized.Parameters(name = "server={0}, sender={1}, consumer={2}")
public static Collection getParameters() {
// we don't need every single version ever released..
// if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time
@ -64,7 +62,8 @@ public class ExportImportTest extends VersionedBaseTest {
// combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE});
// combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE});
combinations.addAll(combinatory(new Object[]{null}, new Object[]{ONE_FIVE, TWO_FOUR, SNAPSHOT}, new Object[]{ONE_FIVE, TWO_FOUR, SNAPSHOT}));
combinations.add(new Object[]{null, ONE_FIVE, SNAPSHOT});
combinations.add(new Object[]{null, SNAPSHOT, SNAPSHOT});
return combinations;
}
@ -83,17 +82,31 @@ public class ExportImportTest extends VersionedBaseTest {
try {
stopServer(serverClassloader);
} catch (Throwable ignored) {
ignored.printStackTrace();
}
try {
stopServer(receiverClassloader);
} catch (Throwable ignored) {
ignored.printStackTrace();
}
}
@Test
@Ignore // There's some work to be done on exporter / importer, but I wanted to send it in already
public void testSendReceive() throws Throwable {
setVariable(senderClassloader, "persistent", Boolean.TRUE);
internalSendReceive(false);
}
@Test
public void testSendReceivelegacy() throws Throwable {
if (!sender.equals(SNAPSHOT)) {
// makes no sense on snapshot
internalSendReceive(true);
}
}
public void internalSendReceive(boolean legacyPrefixes) throws Throwable {
setVariable(senderClassloader, "legacy", false);
setVariable(senderClassloader, "persistent", true);
startServer(serverFolder.getRoot(), senderClassloader, "sender");
callScript(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
stopServer(senderClassloader);
@ -104,17 +117,20 @@ public class ExportImportTest extends VersionedBaseTest {
callScript(senderClassloader, "exportimport/export.groovy", serverFolder.getRoot().getAbsolutePath());
}
setVariable(receiverClassloader, "persistent", Boolean.TRUE);
startServer(serverFolder.getRoot(), receiverClassloader, "receiver");
setVariable(receiverClassloader, "legacy", legacyPrefixes);
try {
setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "receiver");
setVariable(receiverClassloader, "sort", sender.startsWith("ARTEMIS-1"));
if (receiver.startsWith("ARTEMIS-1")) {
callScript(receiverClassloader, "exportimport/import1X.groovy", serverFolder.getRoot().getAbsolutePath());
} else {
callScript(receiverClassloader, "exportimport/import.groovy", serverFolder.getRoot().getAbsolutePath());
}
setVariable(receiverClassloader, "latch", null);
callScript(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages");
setVariable(receiverClassloader, "latch", null);
callScript(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages");
} finally {
setVariable(receiverClassloader, "legacy", false);
}
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_FOUR;
/**
* To run this test on the IDE and debug it, run the compatibility-tests through a command line once:
*
* cd /compatibility-tests
* mvn install -Ptests | tee output.log
*
* on the output.log you will see the output generated by {@link #getClasspathProperty(String)}
*
* On your IDE, edit the Run Configuration to your test and add those -D as parameters to your test.
* On Idea you would do the following:
*
* Run->Edit Configuration->Add ArtemisMeshTest and add your properties.
*/
@RunWith(Parameterized.class)
public class JournalCompatibilityTest extends VersionedBaseTest {
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {
// we don't need every single version ever released..
// if we keep testing current one against 2.4 and 1.4.. we are sure the wire and API won't change over time
List<Object[]> combinations = new ArrayList<>();
/*
// during development sometimes is useful to comment out the combinations
// and add the ones you are interested.. example:
*/
// combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE});
// combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, ONE_FIVE});
combinations.add(new Object[]{null, TWO_FOUR, SNAPSHOT});
// the purpose on this one is just to validate the test itself.
/// if it can't run against itself it won't work at all
combinations.add(new Object[]{null, SNAPSHOT, SNAPSHOT});
return combinations;
}
public JournalCompatibilityTest(String server, String sender, String receiver) throws Exception {
super(server, sender, receiver);
}
@Before
public void removeFolder() throws Throwable {
FileUtil.deleteDirectory(serverFolder.getRoot());
serverFolder.getRoot().mkdirs();
}
@After
public void tearDown() {
try {
stopServer(serverClassloader);
} catch (Throwable ignored) {
}
try {
stopServer(receiverClassloader);
} catch (Throwable ignored) {
}
}
@Test
public void testSendReceive() throws Throwable {
setVariable(senderClassloader, "persistent", true);
startServer(serverFolder.getRoot(), senderClassloader, "journalTest");
callScript(senderClassloader, "meshTest/sendMessages.groovy", server, sender, "sendAckMessages");
stopServer(senderClassloader);
setVariable(receiverClassloader, "persistent", true);
startServer(serverFolder.getRoot(), receiverClassloader, "journalTest");
setVariable(receiverClassloader, "latch", null);
callScript(receiverClassloader, "meshTest/sendMessages.groovy", server, receiver, "receiveMessages");
}
}