ARTEMIS-5097 Allow export and import data of undefined queues

This will allow users eventually undoing mistakes after removing a queue by accident.
This commit is contained in:
Clebert Suconic 2024-10-11 11:04:41 -04:00 committed by clebertsuconic
parent c58210cc29
commit 8e4bc33dc4
5 changed files with 178 additions and 3 deletions

View File

@ -80,6 +80,11 @@ public final class XmlDataExporter extends DBOption {
private XMLStreamWriter xmlWriter;
private Throwable lastError;
@Option(names = "undefined-prefix", description = "In case a queue does not exist, this will define the prefix to be used on the message export. Default: 'UndefinedQueue_'")
private String undefinedPrefix = "UndefinedQueue_";
// an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID
private final Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs = new HashMap<>();
@ -100,6 +105,15 @@ public final class XmlDataExporter extends DBOption {
XMLMessageExporter exporter;
public String getUndefinedPrefix() {
return undefinedPrefix;
}
public XmlDataExporter setUndefinedPrefix(String undefinedPrefix) {
this.undefinedPrefix = undefinedPrefix;
return this;
}
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
@ -331,11 +345,16 @@ public final class XmlDataExporter extends DBOption {
xmlWriter.writeEndDocument();
xmlWriter.flush();
xmlWriter.close();
} catch (Exception e) {
} catch (Throwable e) {
e.printStackTrace();
this.lastError = e;
}
}
public Throwable getLastError() {
return lastError;
}
private void printBindingsAsXML() throws XMLStreamException {
xmlWriter.writeStartElement(XmlDataConstants.BINDINGS_PARENT);
for (Map.Entry<Long, PersistentAddressBindingEncoding> addressBindingEncodingEntry : addressBindings.entrySet()) {
@ -475,7 +494,22 @@ public final class XmlDataExporter extends DBOption {
private List<String> extractQueueNames(HashMap<Long, DescribeJournal.ReferenceDescribe> refMap) {
List<String> queues = new ArrayList<>();
for (DescribeJournal.ReferenceDescribe ref : refMap.values()) {
queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString());
String queueName;
PersistentQueueBindingEncoding persistentQueueBindingEncoding = queueBindings.get(ref.refEncoding.queueID);
if (persistentQueueBindingEncoding == null) {
PersistentQueueBindingEncoding undefinedQueue = new PersistentQueueBindingEncoding();
undefinedQueue.setId(ref.refEncoding.queueID);
undefinedQueue.replaceQueueName(SimpleString.of(undefinedPrefix + ref.refEncoding.queueID));
undefinedQueue.replaceAddress(undefinedQueue.getQueueName());
queueBindings.put(undefinedQueue.getId(), undefinedQueue);
queueName = String.valueOf(undefinedQueue.getQueueName());
getActionContext().err.println("Queue ID " + ref.refEncoding.queueID + " not defined. Exporting it as " + undefinedQueue.getQueueName());
} else {
queueName = String.valueOf(persistentQueueBindingEncoding.getQueueName());
}
queues.add(queueName);
}
return queues;
}

View File

@ -288,8 +288,23 @@ public final class XmlDataImporter extends ConnectionConfigurationAbtract {
}
}
private void createUndefinedQueue(String name, RoutingType routingType) throws Exception {
ClientSession.QueueQuery queueQuery = managementSession.queueQuery(SimpleString.of(name));
if (!queueQuery.isExists()) {
managementSession.createQueue(QueueConfiguration.of(name).setRoutingType(routingType).setDurable(true).setAutoCreateAddress(true));
}
}
private void sendMessage(List<String> queues, Message message) throws Exception {
final String destination = addressMap.get(queues.get(0));
String destination = addressMap.get(queues.get(0));
if (destination == null) {
createUndefinedQueue(queues.get(0), message.getRoutingType());
destination = queues.get(0);
addressMap.put(queues.get(0), queues.get(0));
}
final ByteBuffer buffer = ByteBuffer.allocate(queues.size() * 8);
final boolean debugLog = logger.isDebugEnabled();

View File

@ -289,6 +289,9 @@ public final class ManagementHelper {
* Returns whether the invocation of the management operation on the server resource succeeded.
*/
public static boolean hasOperationSucceeded(final Message message) {
if (message == null) {
return false;
}
if (!ManagementHelper.isOperationResult(message)) {
return false;
}

View File

@ -186,6 +186,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
this.name = newName;
}
public void replaceAddress(SimpleString address) {
this.address = address;
}
@Override
public SimpleString getFilterString() {
return filterString;

View File

@ -27,11 +27,15 @@ import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.util.EnumSet;
import java.util.UUID;
@ -48,6 +52,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataExporter;
import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataImporter;
import org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator;
@ -55,11 +60,13 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageM
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.tests.unit.util.InVMContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@ -1269,4 +1276,116 @@ public class XmlImportExportTest extends ActiveMQTestBase {
locator.close();
server.stop();
}
@Test
public void testRemovedQueue() throws Exception {
String undefinedPrefix = "undef_" + RandomUtil.randomString() + "_";
final int numberOfMessages = 100;
server = createServer(true, true);
server.start();
forceLong();
String anycastQueueName = getTestClassName() + RandomUtil.randomString();
String multicastQueueName = getTestClassName() + RandomUtil.randomString();
createAnycastPair(server, anycastQueueName);
server.addAddressInfo(new AddressInfo(multicastQueueName).addRoutingType(RoutingType.MULTICAST).setAutoCreated(false));
server.createQueue(QueueConfiguration.of(multicastQueueName).setRoutingType(RoutingType.MULTICAST).setAddress(multicastQueueName));
org.apache.activemq.artemis.core.server.Queue anycastServerQueue = server.locateQueue(anycastQueueName);
assertNotNull(anycastServerQueue);
assertEquals(RoutingType.ANYCAST, anycastServerQueue.getRoutingType());
org.apache.activemq.artemis.core.server.Queue multiCastServerQueue = server.locateQueue(multicastQueueName);
assertNotNull(multiCastServerQueue);
assertEquals(RoutingType.MULTICAST, multiCastServerQueue.getRoutingType());
{
ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(multicastQueueName);
Queue queue = session.createQueue(anycastQueueName);
try (MessageProducer producer = session.createProducer(queue)) {
for (int i = 0; i < numberOfMessages; i++) {
producer.send(session.createTextMessage("hello " + i));
}
}
try (MessageProducer producer = session.createProducer(topic)) {
for (int i = 0; i < numberOfMessages; i++) {
producer.send(session.createTextMessage("hello " + i));
}
}
session.commit();
}
}
// this is forcing a situation where the queue was removed and the messages are still in the journal
removeAddressAndQueue(anycastServerQueue);
removeAddressAndQueue(multiCastServerQueue);
server.stop();
final String fileName = "test.out";
FileOutputStream fileOutputStream = new FileOutputStream(new File(getTestDir(), fileName));
BufferedOutputStream bufferOut = new BufferedOutputStream(fileOutputStream);
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.setUndefinedPrefix(undefinedPrefix);
// the journal should still export even though the bindings don't exist any more
// this is to "facilitate" users recovering or undoing mistakes
xmlDataExporter.process(bufferOut, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
bufferOut.close();
assertNull(xmlDataExporter.getLastError());
server.start();
XmlDataImporter importer = new XmlDataImporter();
importer.input = new File(getTestDir(), fileName).getAbsolutePath();
importer.execute(new ActionContext());
{
ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
connection.start();
Queue anycastJMSQueue = session.createQueue(undefinedPrefix + anycastServerQueue.getID());
Queue multicastJMSQueue = session.createQueue(undefinedPrefix + multiCastServerQueue.getID() + "::" + undefinedPrefix + multiCastServerQueue.getID());
try (MessageConsumer consumer = session.createConsumer(anycastJMSQueue)) {
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
assertNotNull(message);
assertEquals("hello " + i, message.getText());
}
}
try (MessageConsumer consumer = session.createConsumer(multicastJMSQueue)) {
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
assertNotNull(message);
assertEquals("hello " + i, message.getText());
}
}
session.commit();
}
}
}
private void removeAddressAndQueue(org.apache.activemq.artemis.core.server.Queue serverQueue) throws Exception {
AddressInfo addressInfo = server.getAddressInfo(serverQueue.getAddress());
long tx = server.getStorageManager().generateID();
server.getStorageManager().deleteAddressBinding(tx, addressInfo.getId());
server.getStorageManager().deleteQueueBinding(tx, serverQueue.getID());
server.getStorageManager().commitBindings(tx);
}
}