ARTEMIS-1840 Added FQQN Import/Export Live Broker

This commit is contained in:
Martyn Taylor 2018-05-02 11:35:17 +01:00 committed by Clebert Suconic
parent 812776fca7
commit 64ce26e7cc
8 changed files with 826 additions and 55 deletions

View File

@ -20,11 +20,16 @@ package org.apache.activemq.artemis.cli.commands.messages;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session; import javax.jms.Session;
import java.io.FileOutputStream;
import java.io.OutputStream;
import io.airlift.airline.Command; import io.airlift.airline.Command;
import io.airlift.airline.Option; import io.airlift.airline.Option;
import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer;
@Command(name = "consumer", description = "It will consume messages from an instance") @Command(name = "consumer", description = "It will consume messages from an instance")
public class Consumer extends DestAbstract { public class Consumer extends DestAbstract {
@ -41,6 +46,9 @@ public class Consumer extends DestAbstract {
@Option(name = "--filter", description = "filter to be used with the consumer") @Option(name = "--filter", description = "filter to be used with the consumer")
String filter; String filter;
@Option(name = "--data", description = "serialize the messages to the specified file as they are consumed")
String file;
@Override @Override
public Object execute(ActionContext context) throws Exception { public Object execute(ActionContext context) throws Exception {
super.execute(context); super.execute(context);
@ -49,7 +57,34 @@ public class Consumer extends DestAbstract {
ConnectionFactory factory = createConnectionFactory(); ConnectionFactory factory = createConnectionFactory();
SerialiserMessageListener listener = null;
MessageSerializer messageSerializer = null;
if (file != null) {
try {
String className = serializer == null ? DEFAULT_MESSAGE_SERIALIZER : serializer;
if (className.equals(DEFAULT_MESSAGE_SERIALIZER) && !protocol.equalsIgnoreCase("CORE")) {
System.err.println("Default Serializer does not support: " + protocol + " protocol");
return null;
}
messageSerializer = (MessageSerializer) Class.forName(className).getConstructor().newInstance();
} catch (Exception e) {
System.err.println("Error. Unable to instantiate serializer class: " + serializer);
return null;
}
try {
OutputStream out = new FileOutputStream(file);
listener = new SerialiserMessageListener(messageSerializer, out);
} catch (Exception e) {
System.err.println("Error: Unable to open file for writing\n" + e.getMessage());
return null;
}
}
if (messageSerializer != null) messageSerializer.start();
try (Connection connection = factory.createConnection()) { try (Connection connection = factory.createConnection()) {
// We read messages in a single thread when persisting to file.
ConsumerThread[] threadsArray = new ConsumerThread[threads]; ConsumerThread[] threadsArray = new ConsumerThread[threads];
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
Session session; Session session;
@ -58,10 +93,13 @@ public class Consumer extends DestAbstract {
} else { } else {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} }
Destination dest = lookupDestination(session);
// Do validation on FQQN
Destination dest = isFQQN() ? session.createQueue(getFQQNFromDestination(destination)) : lookupDestination(session);
threadsArray[i] = new ConsumerThread(session, dest, i); threadsArray[i] = new ConsumerThread(session, dest, i);
threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull).setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false); threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull)
.setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false).setListener(listener);
} }
for (ConsumerThread thread : threadsArray) { for (ConsumerThread thread : threadsArray) {
@ -77,9 +115,24 @@ public class Consumer extends DestAbstract {
received += thread.getReceived(); received += thread.getReceived();
} }
if (messageSerializer != null) messageSerializer.stop();
return received; return received;
} }
} }
private class SerialiserMessageListener implements MessageListener {
private MessageSerializer messageSerializer;
SerialiserMessageListener(MessageSerializer messageSerializer, OutputStream outputStream) throws Exception {
this.messageSerializer = messageSerializer;
this.messageSerializer.setOutput(outputStream);
}
@Override
public void onMessage(Message message) {
messageSerializer.write(message);
}
}
} }

View File

@ -21,6 +21,7 @@ import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage; import javax.jms.ObjectMessage;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.QueueBrowser; import javax.jms.QueueBrowser;
@ -50,6 +51,7 @@ public class ConsumerThread extends Thread {
boolean running = false; boolean running = false;
CountDownLatch finished; CountDownLatch finished;
boolean bytesAsText; boolean bytesAsText;
MessageListener listener;
public ConsumerThread(Session session, Destination destination, int threadNr) { public ConsumerThread(Session session, Destination destination, int threadNr) {
super("Consumer " + destination.toString() + ", thread=" + threadNr); super("Consumer " + destination.toString() + ", thread=" + threadNr);
@ -66,6 +68,43 @@ public class ConsumerThread extends Thread {
} }
} }
private void handle(Message msg, boolean browse) throws JMSException {
if (listener != null) {
listener.onMessage(msg);
} else {
if (browse) {
if (verbose) {
System.out.println("..." + msg);
}
if (bytesAsText && (msg instanceof BytesMessage)) {
long length = ((BytesMessage) msg).getBodyLength();
byte[] bytes = new byte[(int) length];
((BytesMessage) msg).readBytes(bytes);
System.out.println("Message:" + msg);
}
} else {
if (verbose) {
if (bytesAsText && (msg instanceof BytesMessage)) {
long length = ((BytesMessage) msg).getBodyLength();
byte[] bytes = new byte[(int) length];
((BytesMessage) msg).readBytes(bytes);
System.out.println("Received a message with " + bytes.length);
}
if (msg instanceof TextMessage) {
String text = ((TextMessage) msg).getText();
System.out.println("Received text sized at " + text.length());
}
if (msg instanceof ObjectMessage) {
Object obj = ((ObjectMessage) msg).getObject();
System.out.println("Received object " + obj.toString().length());
}
}
}
}
}
public void browse() { public void browse() {
running = true; running = true;
QueueBrowser consumer = null; QueueBrowser consumer = null;
@ -83,16 +122,7 @@ public class ConsumerThread extends Thread {
Message msg = enumBrowse.nextElement(); Message msg = enumBrowse.nextElement();
if (msg != null) { if (msg != null) {
System.out.println(threadName + " browsing " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID())); System.out.println(threadName + " browsing " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
handle(msg, true);
if (verbose) {
System.out.println("..." + msg);
}
if (bytesAsText && (msg instanceof BytesMessage)) {
long length = ((BytesMessage) msg).getBodyLength();
byte[] bytes = new byte[(int) length];
((BytesMessage) msg).readBytes(bytes);
System.out.println("Message:" + msg);
}
received++; received++;
if (received >= messageCount) { if (received >= messageCount) {
@ -158,24 +188,7 @@ public class ConsumerThread extends Thread {
System.out.println("Received " + count); System.out.println("Received " + count);
} }
} }
if (verbose) { handle(msg, false);
if (bytesAsText && (msg instanceof BytesMessage)) {
long length = ((BytesMessage) msg).getBodyLength();
byte[] bytes = new byte[(int) length];
((BytesMessage) msg).readBytes(bytes);
System.out.println("Received a message with " + bytes.length);
}
if (msg instanceof TextMessage) {
String text = ((TextMessage) msg).getText();
System.out.println("Received text sized at " + text.length());
}
if (msg instanceof ObjectMessage) {
Object obj = ((ObjectMessage) msg).getObject();
System.out.println("Received object " + obj.toString().length());
}
}
received++; received++;
} else { } else {
if (breakOnNull) { if (breakOnNull) {
@ -334,4 +347,8 @@ public class ConsumerThread extends Thread {
this.browse = browse; this.browse = browse;
return this; return this;
} }
public void setListener(MessageListener listener) {
this.listener = listener;
}
} }

View File

@ -19,12 +19,30 @@ package org.apache.activemq.artemis.cli.commands.messages;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.Session; import javax.jms.Session;
import java.nio.ByteBuffer;
import io.airlift.airline.Option; import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientRequestor;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer;
import org.apache.activemq.artemis.cli.factory.serialize.XMLMessageSerializer;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
public class DestAbstract extends ConnectionAbstract { public class DestAbstract extends ConnectionAbstract {
public static final String DEFAULT_MESSAGE_SERIALIZER = "org.apache.activemq.artemis.cli.factory.serialize.XMLMessageSerializer";
private static final String FQQN_PREFIX = "fqqn://";
private static final String FQQN_SEPERATOR = "::";
@Option(name = "--destination", description = "Destination to be used. It can be prefixed with queue:// or topic:// (Default: queue://TEST)") @Option(name = "--destination", description = "Destination to be used. It can be prefixed with queue:// or topic:// (Default: queue://TEST)")
String destination = "queue://TEST"; String destination = "queue://TEST";
@ -40,6 +58,25 @@ public class DestAbstract extends ConnectionAbstract {
@Option(name = "--threads", description = "Number of Threads to be used (Default: 1)") @Option(name = "--threads", description = "Number of Threads to be used (Default: 1)")
int threads = 1; int threads = 1;
@Option(name = "--serializer", description = "Override the default serializer with a custom implementation")
String serializer;
protected boolean isFQQN() throws ActiveMQException {
boolean fqqn = destination.contains("::");
if (fqqn) {
if (!destination.startsWith("fqqn://")) {
throw new ActiveMQException("FQQN destinations must start with the fqqn:// prefix");
}
if (protocol.equalsIgnoreCase("AMQP")) {
throw new ActiveMQException("Sending to FQQN destinations is not support via AMQP protocol");
}
return true;
} else {
return false;
}
}
protected Destination lookupDestination(Session session) throws Exception { protected Destination lookupDestination(Session session) throws Exception {
if (protocol.equals("AMQP")) { if (protocol.equals("AMQP")) {
return session.createQueue(destination); return session.createQueue(destination);
@ -48,4 +85,63 @@ public class DestAbstract extends ConnectionAbstract {
} }
} }
protected MessageSerializer getMessageSerializer() {
if (serializer == null) return new XMLMessageSerializer();
try {
return (MessageSerializer) Class.forName(serializer).getConstructor().newInstance();
} catch (Exception e) {
System.out.println("Error: unable to instantiate serializer class: " + serializer);
System.out.println("Defaulting to: " + DEFAULT_MESSAGE_SERIALIZER);
}
return new XMLMessageSerializer();
}
// FIXME We currently do not support producing to FQQN. This is a work around.
private ClientSession getManagementSession() throws Exception {
ServerLocator serverLocator = ActiveMQClient.createServerLocator(brokerURL);
ClientSessionFactory sf = serverLocator.createSessionFactory();
ClientSession managementSession;
if (user != null || password != null) {
managementSession = sf.createSession(user, password, false, true, true, false, 0);
} else {
managementSession = sf.createSession(false, true, true);
}
return managementSession;
}
public byte[] getQueueIdFromName(String queueName) throws Exception {
ClientMessage message = getQueueAttribute(queueName, "ID");
Number idObject = (Number) ManagementHelper.getResult(message);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
byteBuffer.putLong(idObject.longValue());
return byteBuffer.array();
}
protected ClientMessage getQueueAttribute(String queueName, String attribute) throws Exception {
ClientSession managementSession = getManagementSession();
managementSession.start();
try (ClientRequestor requestor = new ClientRequestor(managementSession, "activemq.management")) {
ClientMessage managementMessage = managementSession.createMessage(false);
ManagementHelper.putAttribute(managementMessage, ResourceNames.QUEUE + queueName, attribute);
managementSession.start();
ClientMessage reply = requestor.request(managementMessage);
return reply;
} finally {
managementSession.stop();
}
}
protected String getQueueFromFQQN(String fqqn) {
return fqqn.substring(fqqn.indexOf(FQQN_SEPERATOR) + FQQN_SEPERATOR.length());
}
protected String getAddressFromFQQN(String fqqn) {
return fqqn.substring(fqqn.indexOf(FQQN_PREFIX) + FQQN_PREFIX.length(), fqqn.indexOf(FQQN_SEPERATOR));
}
protected String getFQQNFromDestination(String destination) {
return destination.substring(destination.indexOf(FQQN_PREFIX) + FQQN_PREFIX.length());
}
} }

View File

@ -19,12 +19,21 @@ package org.apache.activemq.artemis.cli.commands.messages;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import java.io.FileInputStream;
import io.airlift.airline.Command; import io.airlift.airline.Command;
import io.airlift.airline.Option; import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
@Command(name = "producer", description = "It will send messages to an instance") @Command(name = "producer", description = "It will send messages to an instance")
public class Producer extends DestAbstract { public class Producer extends DestAbstract {
@ -49,6 +58,9 @@ public class Producer extends DestAbstract {
@Option(name = "--group", description = "Message Group to be used") @Option(name = "--group", description = "Message Group to be used")
String msgGroupID = null; String msgGroupID = null;
@Option(name = "--data", description = "Messages will be read form the specified file, other message options will be ignored.")
String fileName = null;
@Override @Override
public Object execute(ActionContext context) throws Exception { public Object execute(ActionContext context) throws Exception {
super.execute(context); super.execute(context);
@ -56,6 +68,47 @@ public class Producer extends DestAbstract {
ConnectionFactory factory = createConnectionFactory(); ConnectionFactory factory = createConnectionFactory();
try (Connection connection = factory.createConnection()) { try (Connection connection = factory.createConnection()) {
byte[] queueId = null;
boolean isFQQN = isFQQN();
if (isFQQN) {
queueId = getQueueIdFromName(getQueueFromFQQN(destination));
}
// If we are reading from file, we process messages sequentially to guarantee ordering. i.e. no thread creation.
if (fileName != null) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination dest = lookupDestination(session, isFQQN);
MessageProducer producer = session.createProducer(dest);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
int messageCount = 0;
try {
MessageSerializer serializer = getMessageSerializer();
serializer.setInput(new FileInputStream(fileName), session);
serializer.start();
Message message = serializer.read();
while (message != null) {
if (queueId != null) ((ActiveMQMessage) message).getCoreMessage().putBytesProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS, queueId);
producer.send(message);
message = serializer.read();
messageCount++;
}
session.commit();
serializer.stop();
} catch (Exception e) {
System.err.println("Error occurred during import. Rolling back.");
session.rollback();
e.printStackTrace();
return 0;
}
System.out.println("Sent " + messageCount + " Messages.");
return messageCount;
} else {
ProducerThread[] threadsArray = new ProducerThread[threads]; ProducerThread[] threadsArray = new ProducerThread[threads];
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
Session session; Session session;
@ -64,13 +117,13 @@ public class Producer extends DestAbstract {
} else { } else {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} }
Destination dest = lookupDestination(session); Destination dest = lookupDestination(session, isFQQN);
threadsArray[i] = new ProducerThread(session, dest, i); threadsArray[i] = new ProducerThread(session, dest, i);
threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent). threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent).
setMessageSize(messageSize).setTextMessageSize(textMessageSize).setObjectSize(objectSize). setMessageSize(messageSize).setTextMessageSize(textMessageSize).setObjectSize(objectSize).
setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize). setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize).
setMessageCount(messageCount); setMessageCount(messageCount).setQueueId(queueId);
} }
for (ProducerThread thread : threadsArray) { for (ProducerThread thread : threadsArray) {
@ -82,9 +135,33 @@ public class Producer extends DestAbstract {
thread.join(); thread.join();
messagesProduced += thread.getSentCount(); messagesProduced += thread.getSentCount();
} }
return messagesProduced; return messagesProduced;
} }
} }
}
public Destination lookupDestination(Session session, boolean isFQQN) throws Exception {
Destination dest;
if (!isFQQN) {
dest = lookupDestination(session);
} else {
String address = getAddressFromFQQN(destination);
if (isFQQNAnycast(getQueueFromFQQN(destination))) {
String queue = getQueueFromFQQN(destination);
if (!queue.equals(address)) {
throw new ActiveMQException("FQQN support is limited to Anycast queues where the queue name equals the address.");
}
dest = session.createQueue(address);
} else {
dest = session.createTopic(address);
}
}
return dest;
}
protected boolean isFQQNAnycast(String queueName) throws Exception {
ClientMessage message = getQueueAttribute(queueName, "RoutingType");
String routingType = (String) ManagementHelper.getResult(message);
return routingType.equalsIgnoreCase("anycast");
}
} }

View File

@ -30,6 +30,7 @@ import java.io.InputStreamReader;
import java.net.URL; import java.net.URL;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.ReusableLatch;
public class ProducerThread extends Thread { public class ProducerThread extends Thread {
@ -48,6 +49,7 @@ public class ProducerThread extends Thread {
long msgTTL = 0L; long msgTTL = 0L;
String msgGroupID = null; String msgGroupID = null;
int transactionBatchSize; int transactionBatchSize;
byte[] queueId = null;
int transactions = 0; int transactions = 0;
final AtomicInteger sentCount = new AtomicInteger(0); final AtomicInteger sentCount = new AtomicInteger(0);
@ -121,6 +123,11 @@ public class ProducerThread extends Thread {
private void sendMessage(MessageProducer producer, String threadName) throws Exception { private void sendMessage(MessageProducer producer, String threadName) throws Exception {
Message message = createMessage(sentCount.get(), threadName); Message message = createMessage(sentCount.get(), threadName);
if (queueId != null) {
((ActiveMQMessage) message).getCoreMessage().putBytesProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS, queueId);
}
producer.send(message); producer.send(message);
if (verbose) { if (verbose) {
System.out.println(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID())); System.out.println(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID()));
@ -370,4 +377,8 @@ public class ProducerThread extends Thread {
this.objectSize = objectSize; this.objectSize = objectSize;
return this; return this;
} }
public void setQueueId(byte[] queueId) {
this.queueId = queueId;
}
} }

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.factory.serialize;
import javax.jms.Message;
import javax.jms.Session;
import java.io.InputStream;
import java.io.OutputStream;
public interface MessageSerializer {
Message read() throws Exception;
void write(Message message);
void setOutput(OutputStream out) throws Exception;
void setInput(InputStream in, Session session) throws Exception;
void start() throws Exception;
void stop() throws Exception;
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.factory.serialize;
import javax.jms.Message;
import javax.jms.Session;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamReader;
import javax.xml.stream.XMLStreamWriter;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Proxy;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.cli.commands.tools.xml.XMLMessageExporter;
import org.apache.activemq.artemis.cli.commands.tools.xml.XMLMessageImporter;
import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataConstants;
import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataExporter;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
public class XMLMessageSerializer implements MessageSerializer {
private XMLMessageExporter writer;
private XMLMessageImporter reader;
private ClientSession clientSession;
private OutputStream out;
@Override
public synchronized Message read() throws Exception {
reader.getRawXMLReader().nextTag();
// End of document.
if (reader.getRawXMLReader().getLocalName().equals("messages")) return null;
XMLMessageImporter.MessageInfo messageInfo = reader.readMessage(true);
if (messageInfo == null) return null;
// This is a large message
ActiveMQMessage jmsMessage = new ActiveMQMessage((ClientMessage) messageInfo.message, clientSession);
if (messageInfo.tempFile != null) {
jmsMessage.setInputStream(new FileInputStream(messageInfo.tempFile));
}
return jmsMessage;
}
@Override
public synchronized void write(Message message) {
try {
ICoreMessage core = ((ActiveMQMessage) message).getCoreMessage();
writer.printSingleMessageAsXML(core, null, true);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void setOutput(OutputStream outputStream) throws Exception {
this.out = outputStream;
XMLOutputFactory factory = XMLOutputFactory.newInstance();
XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(outputStream, "UTF-8");
XmlDataExporter.PrettyPrintHandler handler = new XmlDataExporter.PrettyPrintHandler(rawXmlWriter);
XMLStreamWriter xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler);
this.writer = new XMLMessageExporter(xmlWriter);
}
@Override
public void setInput(InputStream inputStream, Session session) throws Exception {
XMLStreamReader streamReader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
this.clientSession = ((ActiveMQSession) session).getCoreSession();
this.reader = new XMLMessageImporter(streamReader, clientSession);
}
@Override
public synchronized void start() throws Exception {
if (writer != null) {
writer.getRawXMLWriter().writeStartDocument(XmlDataConstants.XML_VERSION);
writer.getRawXMLWriter().writeStartElement(XmlDataConstants.MESSAGES_PARENT);
}
if (reader != null) {
// <messages>
reader.getRawXMLReader().nextTag();
}
}
@Override
public synchronized void stop() throws Exception {
if (writer != null) {
writer.getRawXMLWriter().writeEndElement();
writer.getRawXMLWriter().writeEndDocument();
writer.getRawXMLWriter().flush();
writer.getRawXMLWriter().close();
out.flush();
}
}
}

View File

@ -0,0 +1,362 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.cli.test;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.cli.Artemis;
import org.apache.activemq.artemis.cli.commands.Run;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Test to validate that the CLI doesn't throw improper exceptions when invoked.
*/
public class MessageSerializerTest extends CliTestBase {
private Connection connection;
@Before
@Override
public void setup() throws Exception {
setupAuth();
super.setup();
startServer();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = cf.createConnection("admin", "admin");
}
@After
@Override
public void tearDown() throws Exception {
try {
connection.close();
} finally {
stopServer();
super.tearDown();
}
}
private void setupAuth() throws Exception {
setupAuth(temporaryFolder.getRoot());
}
private void setupAuth(File folder) throws Exception {
System.setProperty("java.security.auth.login.config", folder.getAbsolutePath() + "/etc/login.config");
}
private void startServer() throws Exception {
File rootDirectory = new File(temporaryFolder.getRoot(), "broker");
setupAuth(rootDirectory);
Run.setEmbedded(true);
Artemis.main("create", rootDirectory.getAbsolutePath(), "--silent", "--no-fsync", "--no-autotune", "--no-web", "--require-login");
System.setProperty("artemis.instance", rootDirectory.getAbsolutePath());
Artemis.internalExecute("run");
}
private void stopServer() throws Exception {
Artemis.internalExecute("stop");
assertTrue(Run.latchRunning.await(5, TimeUnit.SECONDS));
assertEquals(0, LibaioContext.getTotalMaxIO());
}
private File createMessageFile() throws IOException {
return temporaryFolder.newFile("messages.xml");
}
@Test
public void testTextMessageImportExport() throws Exception {
String address = "test";
int noMessages = 10;
File file = createMessageFile();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
List<Message> sent = new ArrayList<>(noMessages);
for (int i = 0; i < noMessages; i++) {
sent.add(session.createTextMessage(RandomUtil.randomString()));
}
sendMessages(session, address, sent);
exportMessages(address, noMessages, file);
// Ensure there's nothing left to consume
MessageConsumer consumer = session.createConsumer(getDestination(address));
assertNull(consumer.receive(1000));
consumer.close();
importMessages(address, file);
List<Message> received = consumeMessages(session, address, noMessages, false);
for (int i = 0; i < noMessages; i++) {
assertEquals(((TextMessage) sent.get(i)).getText(), ((TextMessage) received.get(i)).getText());
}
}
@Test
public void testObjectMessageImportExport() throws Exception {
String address = "test";
int noMessages = 10;
File file = createMessageFile();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// Send initial messages.
List<Message> sent = new ArrayList<>(noMessages);
for (int i = 0; i < noMessages; i++) {
sent.add(session.createObjectMessage(UUID.randomUUID()));
}
sendMessages(session, address, sent);
exportMessages(address, noMessages, file);
// Ensure there's nothing left to consume
MessageConsumer consumer = session.createConsumer(getDestination(address));
assertNull(consumer.receive(1000));
consumer.close();
importMessages(address, file);
List<Message> received = consumeMessages(session, address, noMessages, false);
for (int i = 0; i < noMessages; i++) {
assertEquals(((ObjectMessage) sent.get(i)).getObject(), ((ObjectMessage) received.get(i)).getObject());
}
}
@Test
public void testMapMessageImportExport() throws Exception {
String address = "test";
int noMessages = 10;
String key = "testKey";
File file = createMessageFile();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
List<Message> sent = new ArrayList<>(noMessages);
for (int i = 0; i < noMessages; i++) {
MapMessage m = session.createMapMessage();
m.setString(key, RandomUtil.randomString());
sent.add(m);
}
sendMessages(session, address, sent);
exportMessages(address, noMessages, file);
// Ensure there's nothing left to consume
MessageConsumer consumer = session.createConsumer(getDestination(address));
assertNull(consumer.receive(1000));
consumer.close();
importMessages(address, file);
List<Message> received = consumeMessages(session, address, noMessages, false);
for (int i = 0; i < noMessages; i++) {
assertEquals(((MapMessage) sent.get(i)).getString(key), ((MapMessage) received.get(i)).getString(key));
}
}
private void sendMessages(Session session, String address, List<Message> messages) throws Exception {
MessageProducer producer = session.createProducer(getDestination(address));
for (Message m : messages) {
producer.send(m);
}
}
private void sendMessages(Session session, Destination destination, List<Message> messages) throws Exception {
MessageProducer producer = session.createProducer(destination);
for (Message m : messages) {
producer.send(m);
}
}
private List<Message> consumeMessages(Session session, String address, int noMessages, boolean fqqn) throws Exception {
Destination destination = fqqn ? session.createQueue(address) : getDestination(address);
MessageConsumer consumer = session.createConsumer(destination);
List<Message> messages = new ArrayList<>();
for (int i = 0; i < noMessages; i++) {
Message m = consumer.receive(1000);
assertNotNull(m);
messages.add(m);
}
return messages;
}
private void exportMessages(String address, int noMessages, File output) throws Exception {
Artemis.main("consumer",
"--user", "admin",
"--password", "admin",
"--destination", address,
"--message-count", "" + noMessages,
"--data", output.getAbsolutePath());
}
private void importMessages(String address, File input) throws Exception {
Artemis.main("producer",
"--user", "admin",
"--password", "admin",
"--destination", address,
"--data", input.getAbsolutePath());
}
private void createQueue(String routingTypeOption, String address, String queueName) throws Exception {
Artemis.main("queue", "create",
"--user", "admin",
"--password", "admin",
"--address", address,
"--name", queueName,
routingTypeOption,
"--durable",
"--preserve-on-no-consumers",
"--auto-create-address");
}
@Test
public void testSendDirectToQueue() throws Exception {
String address = "test";
String queue1Name = "queue1";
String queue2Name = "queue2";
createQueue("--multicast", address, queue1Name);
createQueue("--multicast", address, queue2Name);
try (ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = cf.createConnection("admin", "admin");) {
// send messages to queue
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
Destination queue1 = session.createQueue(address + "::" + queue1Name);
Destination queue2 = session.createQueue(address + "::" + queue2Name);
MessageConsumer consumer1 = session.createConsumer(queue1);
MessageConsumer consumer2 = session.createConsumer(queue2);
Artemis.main("producer",
"--user", "admin",
"--password", "admin",
"--destination", "fqqn://" + address + "::" + queue1Name,
"--message-count", "5");
assertNull(consumer2.receive(1000));
assertNotNull(consumer1.receive(1000));
}
}
@Test
public void exportFromFQQN() throws Exception {
String addr = "address";
String queue = "queue";
String fqqn = addr + "::" + queue;
String destination = "fqqn://" + fqqn;
File file = createMessageFile();
int noMessages = 10;
createQueue("--multicast", addr, queue);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
Topic topic = session.createTopic(addr);
List<Message> messages = new ArrayList<>(noMessages);
for (int i = 0; i < noMessages; i++) {
messages.add(session.createTextMessage(RandomUtil.randomString()));
}
sendMessages(session, topic, messages);
exportMessages(destination, noMessages, file);
importMessages(destination, file);
List<Message> recieved = consumeMessages(session, fqqn, noMessages, true);
for (int i = 0; i < noMessages; i++) {
assertEquals(((TextMessage) messages.get(i)).getText(), ((TextMessage) recieved.get(i)).getText());
}
}
//read individual lines from byteStream
private ArrayList<String> getOutputLines(TestActionContext context, boolean errorOutput) throws IOException {
byte[] bytes;
if (errorOutput) {
bytes = context.getStdErrBytes();
} else {
bytes = context.getStdoutBytes();
}
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(bytes)));
ArrayList<String> lines = new ArrayList<>();
String currentLine = bufferedReader.readLine();
while (currentLine != null) {
lines.add(currentLine);
currentLine = bufferedReader.readLine();
}
return lines;
}
private void sendMessages(Session session, String queueName, int messageCount) throws JMSException {
MessageProducer producer = session.createProducer(getDestination(queueName));
TextMessage message = session.createTextMessage(getTestMessageBody());
for (int i = 0; i < messageCount; i++) {
producer.send(message);
}
}
private String getTestMessageBody() {
return "Sample Message";
}
private Destination getDestination(String queueName) {
return ActiveMQDestination.createDestination("queue://" + queueName, ActiveMQDestination.TYPE.QUEUE);
}
}