This closes #2056
This commit is contained in:
commit
ab9f5128bc
|
@ -20,11 +20,16 @@ package org.apache.activemq.artemis.cli.commands.messages;
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.ConnectionFactory;
|
import javax.jms.ConnectionFactory;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageListener;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
|
||||||
import io.airlift.airline.Command;
|
import io.airlift.airline.Command;
|
||||||
import io.airlift.airline.Option;
|
import io.airlift.airline.Option;
|
||||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||||
|
import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer;
|
||||||
|
|
||||||
@Command(name = "consumer", description = "It will consume messages from an instance")
|
@Command(name = "consumer", description = "It will consume messages from an instance")
|
||||||
public class Consumer extends DestAbstract {
|
public class Consumer extends DestAbstract {
|
||||||
|
@ -41,6 +46,9 @@ public class Consumer extends DestAbstract {
|
||||||
@Option(name = "--filter", description = "filter to be used with the consumer")
|
@Option(name = "--filter", description = "filter to be used with the consumer")
|
||||||
String filter;
|
String filter;
|
||||||
|
|
||||||
|
@Option(name = "--data", description = "serialize the messages to the specified file as they are consumed")
|
||||||
|
String file;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object execute(ActionContext context) throws Exception {
|
public Object execute(ActionContext context) throws Exception {
|
||||||
super.execute(context);
|
super.execute(context);
|
||||||
|
@ -49,7 +57,34 @@ public class Consumer extends DestAbstract {
|
||||||
|
|
||||||
ConnectionFactory factory = createConnectionFactory();
|
ConnectionFactory factory = createConnectionFactory();
|
||||||
|
|
||||||
|
SerialiserMessageListener listener = null;
|
||||||
|
MessageSerializer messageSerializer = null;
|
||||||
|
if (file != null) {
|
||||||
|
try {
|
||||||
|
String className = serializer == null ? DEFAULT_MESSAGE_SERIALIZER : serializer;
|
||||||
|
if (className.equals(DEFAULT_MESSAGE_SERIALIZER) && !protocol.equalsIgnoreCase("CORE")) {
|
||||||
|
System.err.println("Default Serializer does not support: " + protocol + " protocol");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
messageSerializer = (MessageSerializer) Class.forName(className).getConstructor().newInstance();
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println("Error. Unable to instantiate serializer class: " + serializer);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
OutputStream out = new FileOutputStream(file);
|
||||||
|
listener = new SerialiserMessageListener(messageSerializer, out);
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println("Error: Unable to open file for writing\n" + e.getMessage());
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (messageSerializer != null) messageSerializer.start();
|
||||||
|
|
||||||
try (Connection connection = factory.createConnection()) {
|
try (Connection connection = factory.createConnection()) {
|
||||||
|
// We read messages in a single thread when persisting to file.
|
||||||
ConsumerThread[] threadsArray = new ConsumerThread[threads];
|
ConsumerThread[] threadsArray = new ConsumerThread[threads];
|
||||||
for (int i = 0; i < threads; i++) {
|
for (int i = 0; i < threads; i++) {
|
||||||
Session session;
|
Session session;
|
||||||
|
@ -58,10 +93,13 @@ public class Consumer extends DestAbstract {
|
||||||
} else {
|
} else {
|
||||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
}
|
}
|
||||||
Destination dest = lookupDestination(session);
|
|
||||||
|
// Do validation on FQQN
|
||||||
|
Destination dest = isFQQN() ? session.createQueue(getFQQNFromDestination(destination)) : lookupDestination(session);
|
||||||
threadsArray[i] = new ConsumerThread(session, dest, i);
|
threadsArray[i] = new ConsumerThread(session, dest, i);
|
||||||
|
|
||||||
threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull).setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false);
|
threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull)
|
||||||
|
.setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false).setListener(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (ConsumerThread thread : threadsArray) {
|
for (ConsumerThread thread : threadsArray) {
|
||||||
|
@ -77,9 +115,24 @@ public class Consumer extends DestAbstract {
|
||||||
received += thread.getReceived();
|
received += thread.getReceived();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (messageSerializer != null) messageSerializer.stop();
|
||||||
|
|
||||||
return received;
|
return received;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class SerialiserMessageListener implements MessageListener {
|
||||||
|
|
||||||
|
private MessageSerializer messageSerializer;
|
||||||
|
|
||||||
|
SerialiserMessageListener(MessageSerializer messageSerializer, OutputStream outputStream) throws Exception {
|
||||||
|
this.messageSerializer = messageSerializer;
|
||||||
|
this.messageSerializer.setOutput(outputStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message) {
|
||||||
|
messageSerializer.write(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import javax.jms.Destination;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageListener;
|
||||||
import javax.jms.ObjectMessage;
|
import javax.jms.ObjectMessage;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.QueueBrowser;
|
import javax.jms.QueueBrowser;
|
||||||
|
@ -50,6 +51,7 @@ public class ConsumerThread extends Thread {
|
||||||
boolean running = false;
|
boolean running = false;
|
||||||
CountDownLatch finished;
|
CountDownLatch finished;
|
||||||
boolean bytesAsText;
|
boolean bytesAsText;
|
||||||
|
MessageListener listener;
|
||||||
|
|
||||||
public ConsumerThread(Session session, Destination destination, int threadNr) {
|
public ConsumerThread(Session session, Destination destination, int threadNr) {
|
||||||
super("Consumer " + destination.toString() + ", thread=" + threadNr);
|
super("Consumer " + destination.toString() + ", thread=" + threadNr);
|
||||||
|
@ -66,6 +68,43 @@ public class ConsumerThread extends Thread {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void handle(Message msg, boolean browse) throws JMSException {
|
||||||
|
if (listener != null) {
|
||||||
|
listener.onMessage(msg);
|
||||||
|
} else {
|
||||||
|
if (browse) {
|
||||||
|
if (verbose) {
|
||||||
|
System.out.println("..." + msg);
|
||||||
|
}
|
||||||
|
if (bytesAsText && (msg instanceof BytesMessage)) {
|
||||||
|
long length = ((BytesMessage) msg).getBodyLength();
|
||||||
|
byte[] bytes = new byte[(int) length];
|
||||||
|
((BytesMessage) msg).readBytes(bytes);
|
||||||
|
System.out.println("Message:" + msg);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (verbose) {
|
||||||
|
if (bytesAsText && (msg instanceof BytesMessage)) {
|
||||||
|
long length = ((BytesMessage) msg).getBodyLength();
|
||||||
|
byte[] bytes = new byte[(int) length];
|
||||||
|
((BytesMessage) msg).readBytes(bytes);
|
||||||
|
System.out.println("Received a message with " + bytes.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (msg instanceof TextMessage) {
|
||||||
|
String text = ((TextMessage) msg).getText();
|
||||||
|
System.out.println("Received text sized at " + text.length());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (msg instanceof ObjectMessage) {
|
||||||
|
Object obj = ((ObjectMessage) msg).getObject();
|
||||||
|
System.out.println("Received object " + obj.toString().length());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void browse() {
|
public void browse() {
|
||||||
running = true;
|
running = true;
|
||||||
QueueBrowser consumer = null;
|
QueueBrowser consumer = null;
|
||||||
|
@ -83,16 +122,7 @@ public class ConsumerThread extends Thread {
|
||||||
Message msg = enumBrowse.nextElement();
|
Message msg = enumBrowse.nextElement();
|
||||||
if (msg != null) {
|
if (msg != null) {
|
||||||
System.out.println(threadName + " browsing " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
|
System.out.println(threadName + " browsing " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
|
||||||
|
handle(msg, true);
|
||||||
if (verbose) {
|
|
||||||
System.out.println("..." + msg);
|
|
||||||
}
|
|
||||||
if (bytesAsText && (msg instanceof BytesMessage)) {
|
|
||||||
long length = ((BytesMessage) msg).getBodyLength();
|
|
||||||
byte[] bytes = new byte[(int) length];
|
|
||||||
((BytesMessage) msg).readBytes(bytes);
|
|
||||||
System.out.println("Message:" + msg);
|
|
||||||
}
|
|
||||||
received++;
|
received++;
|
||||||
|
|
||||||
if (received >= messageCount) {
|
if (received >= messageCount) {
|
||||||
|
@ -158,24 +188,7 @@ public class ConsumerThread extends Thread {
|
||||||
System.out.println("Received " + count);
|
System.out.println("Received " + count);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (verbose) {
|
handle(msg, false);
|
||||||
if (bytesAsText && (msg instanceof BytesMessage)) {
|
|
||||||
long length = ((BytesMessage) msg).getBodyLength();
|
|
||||||
byte[] bytes = new byte[(int) length];
|
|
||||||
((BytesMessage) msg).readBytes(bytes);
|
|
||||||
System.out.println("Received a message with " + bytes.length);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (msg instanceof TextMessage) {
|
|
||||||
String text = ((TextMessage) msg).getText();
|
|
||||||
System.out.println("Received text sized at " + text.length());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (msg instanceof ObjectMessage) {
|
|
||||||
Object obj = ((ObjectMessage) msg).getObject();
|
|
||||||
System.out.println("Received object " + obj.toString().length());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
received++;
|
received++;
|
||||||
} else {
|
} else {
|
||||||
if (breakOnNull) {
|
if (breakOnNull) {
|
||||||
|
@ -334,4 +347,8 @@ public class ConsumerThread extends Thread {
|
||||||
this.browse = browse;
|
this.browse = browse;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setListener(MessageListener listener) {
|
||||||
|
this.listener = listener;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,12 +19,30 @@ package org.apache.activemq.artemis.cli.commands.messages;
|
||||||
|
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
import io.airlift.airline.Option;
|
import io.airlift.airline.Option;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientRequestor;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
|
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||||
|
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||||
|
import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer;
|
||||||
|
import org.apache.activemq.artemis.cli.factory.serialize.XMLMessageSerializer;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||||
|
|
||||||
public class DestAbstract extends ConnectionAbstract {
|
public class DestAbstract extends ConnectionAbstract {
|
||||||
|
|
||||||
|
public static final String DEFAULT_MESSAGE_SERIALIZER = "org.apache.activemq.artemis.cli.factory.serialize.XMLMessageSerializer";
|
||||||
|
|
||||||
|
private static final String FQQN_PREFIX = "fqqn://";
|
||||||
|
|
||||||
|
private static final String FQQN_SEPERATOR = "::";
|
||||||
|
|
||||||
@Option(name = "--destination", description = "Destination to be used. It can be prefixed with queue:// or topic:// (Default: queue://TEST)")
|
@Option(name = "--destination", description = "Destination to be used. It can be prefixed with queue:// or topic:// (Default: queue://TEST)")
|
||||||
String destination = "queue://TEST";
|
String destination = "queue://TEST";
|
||||||
|
|
||||||
|
@ -40,6 +58,25 @@ public class DestAbstract extends ConnectionAbstract {
|
||||||
@Option(name = "--threads", description = "Number of Threads to be used (Default: 1)")
|
@Option(name = "--threads", description = "Number of Threads to be used (Default: 1)")
|
||||||
int threads = 1;
|
int threads = 1;
|
||||||
|
|
||||||
|
@Option(name = "--serializer", description = "Override the default serializer with a custom implementation")
|
||||||
|
String serializer;
|
||||||
|
|
||||||
|
protected boolean isFQQN() throws ActiveMQException {
|
||||||
|
boolean fqqn = destination.contains("::");
|
||||||
|
if (fqqn) {
|
||||||
|
if (!destination.startsWith("fqqn://")) {
|
||||||
|
throw new ActiveMQException("FQQN destinations must start with the fqqn:// prefix");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (protocol.equalsIgnoreCase("AMQP")) {
|
||||||
|
throw new ActiveMQException("Sending to FQQN destinations is not support via AMQP protocol");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected Destination lookupDestination(Session session) throws Exception {
|
protected Destination lookupDestination(Session session) throws Exception {
|
||||||
if (protocol.equals("AMQP")) {
|
if (protocol.equals("AMQP")) {
|
||||||
return session.createQueue(destination);
|
return session.createQueue(destination);
|
||||||
|
@ -48,4 +85,63 @@ public class DestAbstract extends ConnectionAbstract {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected MessageSerializer getMessageSerializer() {
|
||||||
|
if (serializer == null) return new XMLMessageSerializer();
|
||||||
|
try {
|
||||||
|
return (MessageSerializer) Class.forName(serializer).getConstructor().newInstance();
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.out.println("Error: unable to instantiate serializer class: " + serializer);
|
||||||
|
System.out.println("Defaulting to: " + DEFAULT_MESSAGE_SERIALIZER);
|
||||||
|
}
|
||||||
|
return new XMLMessageSerializer();
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME We currently do not support producing to FQQN. This is a work around.
|
||||||
|
private ClientSession getManagementSession() throws Exception {
|
||||||
|
ServerLocator serverLocator = ActiveMQClient.createServerLocator(brokerURL);
|
||||||
|
ClientSessionFactory sf = serverLocator.createSessionFactory();
|
||||||
|
|
||||||
|
ClientSession managementSession;
|
||||||
|
if (user != null || password != null) {
|
||||||
|
managementSession = sf.createSession(user, password, false, true, true, false, 0);
|
||||||
|
} else {
|
||||||
|
managementSession = sf.createSession(false, true, true);
|
||||||
|
}
|
||||||
|
return managementSession;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getQueueIdFromName(String queueName) throws Exception {
|
||||||
|
ClientMessage message = getQueueAttribute(queueName, "ID");
|
||||||
|
Number idObject = (Number) ManagementHelper.getResult(message);
|
||||||
|
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
|
||||||
|
byteBuffer.putLong(idObject.longValue());
|
||||||
|
return byteBuffer.array();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ClientMessage getQueueAttribute(String queueName, String attribute) throws Exception {
|
||||||
|
ClientSession managementSession = getManagementSession();
|
||||||
|
managementSession.start();
|
||||||
|
|
||||||
|
try (ClientRequestor requestor = new ClientRequestor(managementSession, "activemq.management")) {
|
||||||
|
ClientMessage managementMessage = managementSession.createMessage(false);
|
||||||
|
ManagementHelper.putAttribute(managementMessage, ResourceNames.QUEUE + queueName, attribute);
|
||||||
|
managementSession.start();
|
||||||
|
ClientMessage reply = requestor.request(managementMessage);
|
||||||
|
return reply;
|
||||||
|
} finally {
|
||||||
|
managementSession.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getQueueFromFQQN(String fqqn) {
|
||||||
|
return fqqn.substring(fqqn.indexOf(FQQN_SEPERATOR) + FQQN_SEPERATOR.length());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getAddressFromFQQN(String fqqn) {
|
||||||
|
return fqqn.substring(fqqn.indexOf(FQQN_PREFIX) + FQQN_PREFIX.length(), fqqn.indexOf(FQQN_SEPERATOR));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getFQQNFromDestination(String destination) {
|
||||||
|
return destination.substring(destination.indexOf(FQQN_PREFIX) + FQQN_PREFIX.length());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,12 +19,21 @@ package org.apache.activemq.artemis.cli.commands.messages;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.ConnectionFactory;
|
import javax.jms.ConnectionFactory;
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
|
||||||
import io.airlift.airline.Command;
|
import io.airlift.airline.Command;
|
||||||
import io.airlift.airline.Option;
|
import io.airlift.airline.Option;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||||
|
import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
|
||||||
|
|
||||||
@Command(name = "producer", description = "It will send messages to an instance")
|
@Command(name = "producer", description = "It will send messages to an instance")
|
||||||
public class Producer extends DestAbstract {
|
public class Producer extends DestAbstract {
|
||||||
|
@ -49,6 +58,9 @@ public class Producer extends DestAbstract {
|
||||||
@Option(name = "--group", description = "Message Group to be used")
|
@Option(name = "--group", description = "Message Group to be used")
|
||||||
String msgGroupID = null;
|
String msgGroupID = null;
|
||||||
|
|
||||||
|
@Option(name = "--data", description = "Messages will be read form the specified file, other message options will be ignored.")
|
||||||
|
String fileName = null;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object execute(ActionContext context) throws Exception {
|
public Object execute(ActionContext context) throws Exception {
|
||||||
super.execute(context);
|
super.execute(context);
|
||||||
|
@ -56,35 +68,100 @@ public class Producer extends DestAbstract {
|
||||||
ConnectionFactory factory = createConnectionFactory();
|
ConnectionFactory factory = createConnectionFactory();
|
||||||
|
|
||||||
try (Connection connection = factory.createConnection()) {
|
try (Connection connection = factory.createConnection()) {
|
||||||
ProducerThread[] threadsArray = new ProducerThread[threads];
|
|
||||||
for (int i = 0; i < threads; i++) {
|
byte[] queueId = null;
|
||||||
Session session;
|
boolean isFQQN = isFQQN();
|
||||||
if (txBatchSize > 0) {
|
if (isFQQN) {
|
||||||
session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
queueId = getQueueIdFromName(getQueueFromFQQN(destination));
|
||||||
} else {
|
}
|
||||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
|
||||||
|
// If we are reading from file, we process messages sequentially to guarantee ordering. i.e. no thread creation.
|
||||||
|
if (fileName != null) {
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
Destination dest = lookupDestination(session, isFQQN);
|
||||||
|
|
||||||
|
MessageProducer producer = session.createProducer(dest);
|
||||||
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
|
||||||
|
int messageCount = 0;
|
||||||
|
try {
|
||||||
|
MessageSerializer serializer = getMessageSerializer();
|
||||||
|
serializer.setInput(new FileInputStream(fileName), session);
|
||||||
|
serializer.start();
|
||||||
|
|
||||||
|
Message message = serializer.read();
|
||||||
|
|
||||||
|
while (message != null) {
|
||||||
|
if (queueId != null) ((ActiveMQMessage) message).getCoreMessage().putBytesProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS, queueId);
|
||||||
|
producer.send(message);
|
||||||
|
message = serializer.read();
|
||||||
|
messageCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
session.commit();
|
||||||
|
serializer.stop();
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println("Error occurred during import. Rolling back.");
|
||||||
|
session.rollback();
|
||||||
|
e.printStackTrace();
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
Destination dest = lookupDestination(session);
|
System.out.println("Sent " + messageCount + " Messages.");
|
||||||
threadsArray[i] = new ProducerThread(session, dest, i);
|
return messageCount;
|
||||||
|
} else {
|
||||||
|
ProducerThread[] threadsArray = new ProducerThread[threads];
|
||||||
|
for (int i = 0; i < threads; i++) {
|
||||||
|
Session session;
|
||||||
|
if (txBatchSize > 0) {
|
||||||
|
session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
} else {
|
||||||
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
}
|
||||||
|
Destination dest = lookupDestination(session, isFQQN);
|
||||||
|
threadsArray[i] = new ProducerThread(session, dest, i);
|
||||||
|
|
||||||
threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent).
|
threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent).
|
||||||
setMessageSize(messageSize).setTextMessageSize(textMessageSize).setObjectSize(objectSize).
|
setMessageSize(messageSize).setTextMessageSize(textMessageSize).setObjectSize(objectSize).
|
||||||
setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize).
|
setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize).
|
||||||
setMessageCount(messageCount);
|
setMessageCount(messageCount).setQueueId(queueId);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (ProducerThread thread : threadsArray) {
|
||||||
|
thread.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
int messagesProduced = 0;
|
||||||
|
for (ProducerThread thread : threadsArray) {
|
||||||
|
thread.join();
|
||||||
|
messagesProduced += thread.getSentCount();
|
||||||
|
}
|
||||||
|
return messagesProduced;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (ProducerThread thread : threadsArray) {
|
|
||||||
thread.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
int messagesProduced = 0;
|
|
||||||
for (ProducerThread thread : threadsArray) {
|
|
||||||
thread.join();
|
|
||||||
messagesProduced += thread.getSentCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
return messagesProduced;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Destination lookupDestination(Session session, boolean isFQQN) throws Exception {
|
||||||
|
Destination dest;
|
||||||
|
if (!isFQQN) {
|
||||||
|
dest = lookupDestination(session);
|
||||||
|
} else {
|
||||||
|
String address = getAddressFromFQQN(destination);
|
||||||
|
if (isFQQNAnycast(getQueueFromFQQN(destination))) {
|
||||||
|
String queue = getQueueFromFQQN(destination);
|
||||||
|
if (!queue.equals(address)) {
|
||||||
|
throw new ActiveMQException("FQQN support is limited to Anycast queues where the queue name equals the address.");
|
||||||
|
}
|
||||||
|
dest = session.createQueue(address);
|
||||||
|
} else {
|
||||||
|
dest = session.createTopic(address);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return dest;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean isFQQNAnycast(String queueName) throws Exception {
|
||||||
|
ClientMessage message = getQueueAttribute(queueName, "RoutingType");
|
||||||
|
String routingType = (String) ManagementHelper.getResult(message);
|
||||||
|
return routingType.equalsIgnoreCase("anycast");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.io.InputStreamReader;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
|
|
||||||
public class ProducerThread extends Thread {
|
public class ProducerThread extends Thread {
|
||||||
|
@ -48,6 +49,7 @@ public class ProducerThread extends Thread {
|
||||||
long msgTTL = 0L;
|
long msgTTL = 0L;
|
||||||
String msgGroupID = null;
|
String msgGroupID = null;
|
||||||
int transactionBatchSize;
|
int transactionBatchSize;
|
||||||
|
byte[] queueId = null;
|
||||||
|
|
||||||
int transactions = 0;
|
int transactions = 0;
|
||||||
final AtomicInteger sentCount = new AtomicInteger(0);
|
final AtomicInteger sentCount = new AtomicInteger(0);
|
||||||
|
@ -121,6 +123,11 @@ public class ProducerThread extends Thread {
|
||||||
|
|
||||||
private void sendMessage(MessageProducer producer, String threadName) throws Exception {
|
private void sendMessage(MessageProducer producer, String threadName) throws Exception {
|
||||||
Message message = createMessage(sentCount.get(), threadName);
|
Message message = createMessage(sentCount.get(), threadName);
|
||||||
|
|
||||||
|
if (queueId != null) {
|
||||||
|
((ActiveMQMessage) message).getCoreMessage().putBytesProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS, queueId);
|
||||||
|
}
|
||||||
|
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
System.out.println(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID()));
|
System.out.println(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID()));
|
||||||
|
@ -370,4 +377,8 @@ public class ProducerThread extends Thread {
|
||||||
this.objectSize = objectSize;
|
this.objectSize = objectSize;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setQueueId(byte[] queueId) {
|
||||||
|
this.queueId = queueId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,148 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.cli.commands.tools.xml;
|
||||||
|
|
||||||
|
import javax.xml.stream.XMLStreamException;
|
||||||
|
import javax.xml.stream.XMLStreamWriter;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
|
||||||
|
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||||
|
import org.apache.activemq.artemis.reader.TextMessageUtil;
|
||||||
|
|
||||||
|
|
||||||
|
/** This is an Utility class that will import the outputs in XML format. */
|
||||||
|
public class XMLMessageExporter {
|
||||||
|
|
||||||
|
private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
|
||||||
|
|
||||||
|
private XMLStreamWriter xmlWriter;
|
||||||
|
|
||||||
|
public XMLMessageExporter(XMLStreamWriter xmlWriter) {
|
||||||
|
this.xmlWriter = xmlWriter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public XMLStreamWriter getRawXMLWriter() {
|
||||||
|
return xmlWriter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void printSingleMessageAsXML(ICoreMessage message, List<String> queues, boolean encodeTextUTF8) throws Exception {
|
||||||
|
xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD);
|
||||||
|
printMessageAttributes(message);
|
||||||
|
printMessageProperties(message);
|
||||||
|
printMessageQueues(queues);
|
||||||
|
printMessageBody(message.toCore(), encodeTextUTF8);
|
||||||
|
xmlWriter.writeEndElement(); // end MESSAGES_CHILD
|
||||||
|
}
|
||||||
|
|
||||||
|
public void printMessageBody(Message message, boolean encodeTextMessageUTF8) throws Exception {
|
||||||
|
xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
|
||||||
|
|
||||||
|
if (message.isLargeMessage()) {
|
||||||
|
printLargeMessageBody((LargeServerMessage) message);
|
||||||
|
} else {
|
||||||
|
if (encodeTextMessageUTF8 && message.toCore().getType() == Message.TEXT_TYPE) {
|
||||||
|
xmlWriter.writeCData(TextMessageUtil.readBodyText(message.toCore().getReadOnlyBodyBuffer()).toString());
|
||||||
|
} else {
|
||||||
|
xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBodyBase64(message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
xmlWriter.writeEndElement(); // end MESSAGE_BODY
|
||||||
|
}
|
||||||
|
|
||||||
|
public void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException {
|
||||||
|
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
|
||||||
|
LargeBodyEncoder encoder = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
encoder = message.toCore().getBodyEncoder();
|
||||||
|
encoder.open();
|
||||||
|
long totalBytesWritten = 0;
|
||||||
|
Long bufferSize;
|
||||||
|
long bodySize = encoder.getLargeBodySize();
|
||||||
|
for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) {
|
||||||
|
Long remainder = bodySize - totalBytesWritten;
|
||||||
|
if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) {
|
||||||
|
bufferSize = LARGE_MESSAGE_CHUNK_SIZE;
|
||||||
|
} else {
|
||||||
|
bufferSize = remainder;
|
||||||
|
}
|
||||||
|
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue());
|
||||||
|
encoder.encode(buffer, bufferSize.intValue());
|
||||||
|
xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array()));
|
||||||
|
totalBytesWritten += bufferSize;
|
||||||
|
}
|
||||||
|
encoder.close();
|
||||||
|
} catch (ActiveMQException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
if (encoder != null) {
|
||||||
|
try {
|
||||||
|
encoder.close();
|
||||||
|
} catch (ActiveMQException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void printMessageQueues(List<String> queues) throws XMLStreamException {
|
||||||
|
if (queues != null) {
|
||||||
|
xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT);
|
||||||
|
for (String queueName : queues) {
|
||||||
|
xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD);
|
||||||
|
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName);
|
||||||
|
}
|
||||||
|
xmlWriter.writeEndElement(); // end QUEUES_PARENT
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void printMessageProperties(Message message) throws XMLStreamException {
|
||||||
|
xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT);
|
||||||
|
for (SimpleString key : message.getPropertyNames()) {
|
||||||
|
Object value = message.getObjectProperty(key);
|
||||||
|
xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD);
|
||||||
|
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString());
|
||||||
|
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, XmlDataExporterUtil.convertProperty(value));
|
||||||
|
|
||||||
|
// Write the property type as an attribute
|
||||||
|
String propertyType = XmlDataExporterUtil.getPropertyType(value);
|
||||||
|
if (propertyType != null) {
|
||||||
|
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, propertyType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
|
||||||
|
}
|
||||||
|
|
||||||
|
public void printMessageAttributes(ICoreMessage message) throws XMLStreamException {
|
||||||
|
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID()));
|
||||||
|
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
|
||||||
|
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));
|
||||||
|
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp()));
|
||||||
|
String prettyType = XmlDataExporterUtil.getMessagePrettyType(message.getType());
|
||||||
|
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType);
|
||||||
|
if (message.getUserID() != null) {
|
||||||
|
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,331 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.cli.commands.tools.xml;
|
||||||
|
|
||||||
|
import javax.xml.stream.XMLStreamConstants;
|
||||||
|
import javax.xml.stream.XMLStreamException;
|
||||||
|
import javax.xml.stream.XMLStreamReader;
|
||||||
|
import java.io.BufferedInputStream;
|
||||||
|
import java.io.BufferedOutputStream;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
|
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.utils.Base64;
|
||||||
|
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
/** This is an Utility class that will import the outputs in XML format. */
|
||||||
|
public class XMLMessageImporter {
|
||||||
|
|
||||||
|
private static final Logger logger = Logger.getLogger(XMLMessageImporter.class);
|
||||||
|
|
||||||
|
private XMLStreamReader reader;
|
||||||
|
|
||||||
|
private ClientSession session;
|
||||||
|
|
||||||
|
Map<String, String> oldPrefixTranslation = new HashMap<>();
|
||||||
|
|
||||||
|
public XMLMessageImporter(XMLStreamReader xmlStreamReader, ClientSession session) {
|
||||||
|
this.reader = xmlStreamReader;
|
||||||
|
this.session = session;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOldPrefixTranslation(Map<String, String> oldPrefixTranslation) {
|
||||||
|
this.oldPrefixTranslation = oldPrefixTranslation;
|
||||||
|
}
|
||||||
|
|
||||||
|
public XMLStreamReader getRawXMLReader() {
|
||||||
|
return reader;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageInfo readMessage(boolean decodeUTF8) throws Exception {
|
||||||
|
if (!reader.hasNext()) return null;
|
||||||
|
|
||||||
|
Byte type = 0;
|
||||||
|
Byte priority = 0;
|
||||||
|
Long expiration = 0L;
|
||||||
|
Long timestamp = 0L;
|
||||||
|
Long id = 0L;
|
||||||
|
org.apache.activemq.artemis.utils.UUID userId = null;
|
||||||
|
ArrayList<String> queues = new ArrayList<>();
|
||||||
|
|
||||||
|
// get message's attributes
|
||||||
|
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
||||||
|
String attributeName = reader.getAttributeLocalName(i);
|
||||||
|
switch (attributeName) {
|
||||||
|
case XmlDataConstants.MESSAGE_TYPE:
|
||||||
|
type = getMessageType(reader.getAttributeValue(i));
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.MESSAGE_PRIORITY:
|
||||||
|
priority = Byte.parseByte(reader.getAttributeValue(i));
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.MESSAGE_EXPIRATION:
|
||||||
|
expiration = Long.parseLong(reader.getAttributeValue(i));
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.MESSAGE_TIMESTAMP:
|
||||||
|
timestamp = Long.parseLong(reader.getAttributeValue(i));
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.MESSAGE_USER_ID:
|
||||||
|
userId = UUIDGenerator.getInstance().generateUUID();
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.MESSAGE_ID:
|
||||||
|
id = Long.parseLong(reader.getAttributeValue(i));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Message message = session.createMessage(type, true, expiration, timestamp, priority);
|
||||||
|
|
||||||
|
message.setUserID(userId);
|
||||||
|
|
||||||
|
boolean endLoop = false;
|
||||||
|
|
||||||
|
File largeMessageTemporaryFile = null;
|
||||||
|
// loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.)
|
||||||
|
while (reader.hasNext()) {
|
||||||
|
int eventType = reader.getEventType();
|
||||||
|
switch (eventType) {
|
||||||
|
case XMLStreamConstants.START_ELEMENT:
|
||||||
|
if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) {
|
||||||
|
largeMessageTemporaryFile = processMessageBody(message.toCore(), decodeUTF8);
|
||||||
|
} else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) {
|
||||||
|
processMessageProperties(message);
|
||||||
|
} else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) {
|
||||||
|
processMessageQueues(queues);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case XMLStreamConstants.END_ELEMENT:
|
||||||
|
if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) {
|
||||||
|
endLoop = true;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (endLoop) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
reader.next();
|
||||||
|
}
|
||||||
|
return new MessageInfo(id, queues, message, largeMessageTemporaryFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Byte getMessageType(String value) {
|
||||||
|
Byte type = Message.DEFAULT_TYPE;
|
||||||
|
switch (value) {
|
||||||
|
case XmlDataConstants.DEFAULT_TYPE_PRETTY:
|
||||||
|
type = Message.DEFAULT_TYPE;
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.BYTES_TYPE_PRETTY:
|
||||||
|
type = Message.BYTES_TYPE;
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.MAP_TYPE_PRETTY:
|
||||||
|
type = Message.MAP_TYPE;
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.OBJECT_TYPE_PRETTY:
|
||||||
|
type = Message.OBJECT_TYPE;
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.STREAM_TYPE_PRETTY:
|
||||||
|
type = Message.STREAM_TYPE;
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.TEXT_TYPE_PRETTY:
|
||||||
|
type = Message.TEXT_TYPE;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processMessageQueues(ArrayList<String> queues) {
|
||||||
|
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
||||||
|
if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i))) {
|
||||||
|
String queueName = reader.getAttributeValue(i);
|
||||||
|
String translation = checkPrefix(queueName);
|
||||||
|
queues.add(translation);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String checkPrefix(String queueName) {
|
||||||
|
String newQueueName = oldPrefixTranslation.get(queueName);
|
||||||
|
if (newQueueName == null) {
|
||||||
|
newQueueName = queueName;
|
||||||
|
}
|
||||||
|
return newQueueName;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processMessageProperties(Message message) {
|
||||||
|
String key = "";
|
||||||
|
String value = "";
|
||||||
|
String propertyType = "";
|
||||||
|
|
||||||
|
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
||||||
|
String attributeName = reader.getAttributeLocalName(i);
|
||||||
|
switch (attributeName) {
|
||||||
|
case XmlDataConstants.PROPERTY_NAME:
|
||||||
|
key = reader.getAttributeValue(i);
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.PROPERTY_VALUE:
|
||||||
|
value = reader.getAttributeValue(i);
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.PROPERTY_TYPE:
|
||||||
|
propertyType = reader.getAttributeValue(i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value.equals(XmlDataConstants.NULL)) {
|
||||||
|
value = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (propertyType) {
|
||||||
|
case XmlDataConstants.PROPERTY_TYPE_SHORT:
|
||||||
|
message.putShortProperty(key, Short.parseShort(value));
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.PROPERTY_TYPE_BOOLEAN:
|
||||||
|
message.putBooleanProperty(key, Boolean.parseBoolean(value));
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.PROPERTY_TYPE_BYTE:
|
||||||
|
message.putByteProperty(key, Byte.parseByte(value));
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.PROPERTY_TYPE_BYTES:
|
||||||
|
message.putBytesProperty(key, value == null ? null : decode(value));
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.PROPERTY_TYPE_DOUBLE:
|
||||||
|
message.putDoubleProperty(key, Double.parseDouble(value));
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.PROPERTY_TYPE_FLOAT:
|
||||||
|
message.putFloatProperty(key, Float.parseFloat(value));
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.PROPERTY_TYPE_INTEGER:
|
||||||
|
message.putIntProperty(key, Integer.parseInt(value));
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.PROPERTY_TYPE_LONG:
|
||||||
|
message.putLongProperty(key, Long.parseLong(value));
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING:
|
||||||
|
message.putStringProperty(new SimpleString(key), value == null ? null : SimpleString.toSimpleString(value));
|
||||||
|
break;
|
||||||
|
case XmlDataConstants.PROPERTY_TYPE_STRING:
|
||||||
|
message.putStringProperty(key, value);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private File processMessageBody(final ICoreMessage message, boolean decodeTextMessage) throws XMLStreamException, IOException {
|
||||||
|
File tempFileName = null;
|
||||||
|
boolean isLarge = false;
|
||||||
|
|
||||||
|
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
||||||
|
String attributeName = reader.getAttributeLocalName(i);
|
||||||
|
if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName)) {
|
||||||
|
isLarge = Boolean.parseBoolean(reader.getAttributeValue(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reader.next();
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("XMLStreamReader impl: " + reader);
|
||||||
|
}
|
||||||
|
if (isLarge) {
|
||||||
|
tempFileName = File.createTempFile("largeMessage", ".tmp");
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Creating temp file " + tempFileName + " for large message.");
|
||||||
|
}
|
||||||
|
try (OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFileName))) {
|
||||||
|
getMessageBodyBytes(bytes -> out.write(bytes), (message.toCore().getType() == Message.TEXT_TYPE) && decodeTextMessage);
|
||||||
|
}
|
||||||
|
FileInputStream fileInputStream = new FileInputStream(tempFileName);
|
||||||
|
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
|
||||||
|
((ClientMessage) message).setBodyInputStream(bufferedInput);
|
||||||
|
} else {
|
||||||
|
getMessageBodyBytes(bytes -> message.getBodyBuffer().writeBytes(bytes), (message.toCore().getType() == Message.TEXT_TYPE) && decodeTextMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
return tempFileName;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Message bodies are written to XML as one or more Base64 encoded CDATA elements. Some parser implementations won't
|
||||||
|
* read an entire CDATA element at once (e.g. Woodstox) so it's possible that multiple CDATA/CHARACTERS events need
|
||||||
|
* to be combined to reconstruct the Base64 encoded string. You can't decode bits and pieces of each CDATA. Each
|
||||||
|
* CDATA has to be decoded in its entirety.
|
||||||
|
*
|
||||||
|
* @param processor used to deal with the decoded CDATA elements
|
||||||
|
* @param textMessage If this a text message we decode UTF8 and encode as a simple string
|
||||||
|
*/
|
||||||
|
private void getMessageBodyBytes(MessageBodyBytesProcessor processor, boolean decodeTextMessage) throws IOException, XMLStreamException {
|
||||||
|
int currentEventType;
|
||||||
|
StringBuilder cdata = new StringBuilder();
|
||||||
|
while (reader.hasNext()) {
|
||||||
|
currentEventType = reader.getEventType();
|
||||||
|
if (currentEventType == XMLStreamConstants.END_ELEMENT) {
|
||||||
|
break;
|
||||||
|
} else if (currentEventType == XMLStreamConstants.CHARACTERS && reader.isWhiteSpace() && cdata.length() > 0) {
|
||||||
|
/* when we hit a whitespace CHARACTERS event we know that the entire CDATA is complete so decode, pass back to
|
||||||
|
* the processor, and reset the cdata for the next event(s)
|
||||||
|
*/
|
||||||
|
if (decodeTextMessage) {
|
||||||
|
SimpleString text = new SimpleString(cdata.toString());
|
||||||
|
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(SimpleString.sizeofNullableString(text));
|
||||||
|
SimpleString.writeNullableSimpleString(byteBuf, text);
|
||||||
|
byte[] bytes = new byte[SimpleString.sizeofNullableString(text)];
|
||||||
|
byteBuf.readBytes(bytes);
|
||||||
|
processor.processBodyBytes(bytes);
|
||||||
|
} else {
|
||||||
|
processor.processBodyBytes(decode(cdata.toString()));
|
||||||
|
cdata.setLength(0);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cdata.append(new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()).trim());
|
||||||
|
}
|
||||||
|
reader.next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] decode(String data) {
|
||||||
|
return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private interface MessageBodyBytesProcessor {
|
||||||
|
void processBodyBytes(byte[] bytes) throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
public class MessageInfo {
|
||||||
|
public long id;
|
||||||
|
public List<String> queues;
|
||||||
|
public Message message;
|
||||||
|
public File tempFile;
|
||||||
|
|
||||||
|
MessageInfo(long id, List<String> queues, Message message, File tempFile) {
|
||||||
|
this.message = message;
|
||||||
|
this.queues = queues;
|
||||||
|
this.id = id;
|
||||||
|
this.tempFile = tempFile;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,7 +26,7 @@ public final class XmlDataConstants {
|
||||||
// Utility
|
// Utility
|
||||||
}
|
}
|
||||||
|
|
||||||
static final String XML_VERSION = "1.0";
|
public static final String XML_VERSION = "1.0";
|
||||||
static final String DOCUMENT_PARENT = "activemq-journal";
|
static final String DOCUMENT_PARENT = "activemq-journal";
|
||||||
static final String BINDINGS_PARENT = "bindings";
|
static final String BINDINGS_PARENT = "bindings";
|
||||||
|
|
||||||
|
@ -50,7 +50,7 @@ public final class XmlDataConstants {
|
||||||
static final String ADDRESS_BINDING_ID = "id";
|
static final String ADDRESS_BINDING_ID = "id";
|
||||||
static final String ADDRESS_BINDING_ROUTING_TYPE = "routing-types";
|
static final String ADDRESS_BINDING_ROUTING_TYPE = "routing-types";
|
||||||
|
|
||||||
static final String MESSAGES_PARENT = "messages";
|
public static final String MESSAGES_PARENT = "messages";
|
||||||
static final String MESSAGES_CHILD = "message";
|
static final String MESSAGES_CHILD = "message";
|
||||||
static final String MESSAGE_ID = "id";
|
static final String MESSAGE_ID = "id";
|
||||||
static final String MESSAGE_PRIORITY = "priority";
|
static final String MESSAGE_PRIORITY = "priority";
|
||||||
|
|
|
@ -36,7 +36,6 @@ import java.util.TreeMap;
|
||||||
import io.airlift.airline.Command;
|
import io.airlift.airline.Command;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
|
||||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
@ -48,7 +47,6 @@ import org.apache.activemq.artemis.core.journal.Journal;
|
||||||
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
||||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||||
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
|
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
|
||||||
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
|
|
||||||
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
||||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
||||||
|
@ -66,12 +64,10 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.Persisten
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
|
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.JournalType;
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
|
||||||
|
|
||||||
@Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
|
@Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
|
||||||
public final class XmlDataExporter extends DBOption {
|
public final class XmlDataExporter extends DBOption {
|
||||||
|
|
||||||
private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
|
|
||||||
private XMLStreamWriter xmlWriter;
|
private XMLStreamWriter xmlWriter;
|
||||||
|
|
||||||
// an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID
|
// an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID
|
||||||
|
@ -92,6 +88,8 @@ public final class XmlDataExporter extends DBOption {
|
||||||
|
|
||||||
long bindingsPrinted = 0L;
|
long bindingsPrinted = 0L;
|
||||||
|
|
||||||
|
XMLMessageExporter exporter;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object execute(ActionContext context) throws Exception {
|
public Object execute(ActionContext context) throws Exception {
|
||||||
super.execute(context);
|
super.execute(context);
|
||||||
|
@ -141,7 +139,7 @@ public final class XmlDataExporter extends DBOption {
|
||||||
XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8");
|
XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8");
|
||||||
PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter);
|
PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter);
|
||||||
xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler);
|
xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler);
|
||||||
|
exporter = new XMLMessageExporter(xmlWriter);
|
||||||
writeXMLData();
|
writeXMLData();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -317,7 +315,7 @@ public final class XmlDataExporter extends DBOption {
|
||||||
|
|
||||||
private void printDataAsXML() {
|
private void printDataAsXML() {
|
||||||
try {
|
try {
|
||||||
xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION);
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT);
|
xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT);
|
||||||
printBindingsAsXML();
|
printBindingsAsXML();
|
||||||
printAllMessagesAsXML();
|
printAllMessagesAsXML();
|
||||||
|
@ -375,6 +373,10 @@ public final class XmlDataExporter extends DBOption {
|
||||||
xmlWriter.writeEndElement(); // end "messages"
|
xmlWriter.writeEndElement(); // end "messages"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void printSingleMessageAsXML(ICoreMessage message, List<String> queues) throws Exception {
|
||||||
|
exporter.printSingleMessageAsXML(message, queues, false);
|
||||||
|
messagesPrinted++;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Reads from the page files and prints messages as it finds them (making sure to check acks and transactions
|
* Reads from the page files and prints messages as it finds them (making sure to check acks and transactions
|
||||||
* from the journal).
|
* from the journal).
|
||||||
|
@ -444,104 +446,9 @@ public final class XmlDataExporter extends DBOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void printSingleMessageAsXML(ICoreMessage message, List<String> queues) throws Exception {
|
private List<String> extractQueueNames(HashMap<Long, DescribeJournal.ReferenceDescribe> refMap) {
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD);
|
|
||||||
printMessageAttributes(message);
|
|
||||||
printMessageProperties(message);
|
|
||||||
printMessageQueues(queues);
|
|
||||||
printMessageBody(message.toCore());
|
|
||||||
xmlWriter.writeEndElement(); // end MESSAGES_CHILD
|
|
||||||
messagesPrinted++;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void printMessageBody(Message message) throws Exception {
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
|
|
||||||
|
|
||||||
if (message.toCore().isLargeMessage()) {
|
|
||||||
printLargeMessageBody((LargeServerMessage) message);
|
|
||||||
} else {
|
|
||||||
xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBody(message));
|
|
||||||
}
|
|
||||||
xmlWriter.writeEndElement(); // end MESSAGE_BODY
|
|
||||||
}
|
|
||||||
|
|
||||||
private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException {
|
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
|
|
||||||
LargeBodyEncoder encoder = null;
|
|
||||||
|
|
||||||
try {
|
|
||||||
encoder = message.toCore().getBodyEncoder();
|
|
||||||
encoder.open();
|
|
||||||
long totalBytesWritten = 0;
|
|
||||||
Long bufferSize;
|
|
||||||
long bodySize = encoder.getLargeBodySize();
|
|
||||||
for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) {
|
|
||||||
Long remainder = bodySize - totalBytesWritten;
|
|
||||||
if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) {
|
|
||||||
bufferSize = LARGE_MESSAGE_CHUNK_SIZE;
|
|
||||||
} else {
|
|
||||||
bufferSize = remainder;
|
|
||||||
}
|
|
||||||
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue());
|
|
||||||
encoder.encode(buffer, bufferSize.intValue());
|
|
||||||
xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array()));
|
|
||||||
totalBytesWritten += bufferSize;
|
|
||||||
}
|
|
||||||
encoder.close();
|
|
||||||
} catch (ActiveMQException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
} finally {
|
|
||||||
if (encoder != null) {
|
|
||||||
try {
|
|
||||||
encoder.close();
|
|
||||||
} catch (ActiveMQException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void printMessageQueues(List<String> queues) throws XMLStreamException {
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT);
|
|
||||||
for (String queueName : queues) {
|
|
||||||
xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD);
|
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName);
|
|
||||||
}
|
|
||||||
xmlWriter.writeEndElement(); // end QUEUES_PARENT
|
|
||||||
}
|
|
||||||
|
|
||||||
private void printMessageProperties(Message message) throws XMLStreamException {
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT);
|
|
||||||
for (SimpleString key : message.getPropertyNames()) {
|
|
||||||
Object value = message.getObjectProperty(key);
|
|
||||||
xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD);
|
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString());
|
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, XmlDataExporterUtil.convertProperty(value));
|
|
||||||
|
|
||||||
// Write the property type as an attribute
|
|
||||||
String propertyType = XmlDataExporterUtil.getPropertyType(value);
|
|
||||||
if (propertyType != null) {
|
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, propertyType);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
|
|
||||||
}
|
|
||||||
|
|
||||||
private void printMessageAttributes(ICoreMessage message) throws XMLStreamException {
|
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID()));
|
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
|
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));
|
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp()));
|
|
||||||
String prettyType = XmlDataExporterUtil.getMessagePrettyType(message.getType());
|
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType);
|
|
||||||
if (message.getUserID() != null) {
|
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<String> extractQueueNames(HashMap<Long, ReferenceDescribe> refMap) {
|
|
||||||
List<String> queues = new ArrayList<>();
|
List<String> queues = new ArrayList<>();
|
||||||
for (ReferenceDescribe ref : refMap.values()) {
|
for (DescribeJournal.ReferenceDescribe ref : refMap.values()) {
|
||||||
queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString());
|
queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString());
|
||||||
}
|
}
|
||||||
return queues;
|
return queues;
|
||||||
|
@ -552,7 +459,7 @@ public final class XmlDataExporter extends DBOption {
|
||||||
/**
|
/**
|
||||||
* Proxy to handle indenting the XML since <code>javax.xml.stream.XMLStreamWriter</code> doesn't support that.
|
* Proxy to handle indenting the XML since <code>javax.xml.stream.XMLStreamWriter</code> doesn't support that.
|
||||||
*/
|
*/
|
||||||
static class PrettyPrintHandler implements InvocationHandler {
|
public static class PrettyPrintHandler implements InvocationHandler {
|
||||||
|
|
||||||
private final XMLStreamWriter target;
|
private final XMLStreamWriter target;
|
||||||
|
|
||||||
|
@ -564,7 +471,7 @@ public final class XmlDataExporter extends DBOption {
|
||||||
|
|
||||||
boolean wrap = true;
|
boolean wrap = true;
|
||||||
|
|
||||||
PrettyPrintHandler(XMLStreamWriter target) {
|
public PrettyPrintHandler(XMLStreamWriter target) {
|
||||||
this.target = target;
|
this.target = target;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -88,7 +88,7 @@ public class XmlDataExporterUtil {
|
||||||
/**
|
/**
|
||||||
* Base64 encode a ServerMessage body into the proper XML format
|
* Base64 encode a ServerMessage body into the proper XML format
|
||||||
*/
|
*/
|
||||||
static String encodeMessageBody(final Message message) throws Exception {
|
static String encodeMessageBodyBase64(final Message message) throws Exception {
|
||||||
Preconditions.checkNotNull(message, "ServerMessage can not be null");
|
Preconditions.checkNotNull(message, "ServerMessage can not be null");
|
||||||
|
|
||||||
ActiveMQBuffer byteBuffer = message.toCore().getReadOnlyBodyBuffer();
|
ActiveMQBuffer byteBuffer = message.toCore().getReadOnlyBodyBuffer();
|
||||||
|
|
|
@ -19,25 +19,18 @@ package org.apache.activemq.artemis.cli.commands.tools.xml;
|
||||||
import javax.xml.XMLConstants;
|
import javax.xml.XMLConstants;
|
||||||
import javax.xml.stream.XMLInputFactory;
|
import javax.xml.stream.XMLInputFactory;
|
||||||
import javax.xml.stream.XMLStreamConstants;
|
import javax.xml.stream.XMLStreamConstants;
|
||||||
import javax.xml.stream.XMLStreamException;
|
|
||||||
import javax.xml.stream.XMLStreamReader;
|
import javax.xml.stream.XMLStreamReader;
|
||||||
import javax.xml.transform.stax.StAXSource;
|
import javax.xml.transform.stax.StAXSource;
|
||||||
import javax.xml.validation.Schema;
|
import javax.xml.validation.Schema;
|
||||||
import javax.xml.validation.SchemaFactory;
|
import javax.xml.validation.SchemaFactory;
|
||||||
import javax.xml.validation.Validator;
|
import javax.xml.validation.Validator;
|
||||||
import java.io.BufferedInputStream;
|
|
||||||
import java.io.BufferedOutputStream;
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.security.AccessController;
|
import java.security.AccessController;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -47,7 +40,6 @@ import java.util.TreeSet;
|
||||||
|
|
||||||
import io.airlift.airline.Command;
|
import io.airlift.airline.Command;
|
||||||
import io.airlift.airline.Option;
|
import io.airlift.airline.Option;
|
||||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
@ -68,10 +60,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.utils.Base64;
|
|
||||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
||||||
import org.apache.activemq.artemis.utils.ListUtil;
|
import org.apache.activemq.artemis.utils.ListUtil;
|
||||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -86,6 +76,8 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
|
|
||||||
private XMLStreamReader reader;
|
private XMLStreamReader reader;
|
||||||
|
|
||||||
|
private XMLMessageImporter messageReader;
|
||||||
|
|
||||||
// this session is really only needed if the "session" variable does not auto-commit sends
|
// this session is really only needed if the "session" variable does not auto-commit sends
|
||||||
ClientSession managementSession;
|
ClientSession managementSession;
|
||||||
|
|
||||||
|
@ -123,7 +115,7 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
@Option(name = "--legacy-prefixes", description = "Do not remove prefixes from legacy imports")
|
@Option(name = "--legacy-prefixes", description = "Do not remove prefixes from legacy imports")
|
||||||
public boolean legacyPrefixes = false;
|
public boolean legacyPrefixes = false;
|
||||||
|
|
||||||
TreeSet<MessageTemp> messages;
|
TreeSet<XMLMessageImporter.MessageInfo> messages;
|
||||||
|
|
||||||
public String getPassword() {
|
public String getPassword() {
|
||||||
return password;
|
return password;
|
||||||
|
@ -179,6 +171,9 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
ClientSession session,
|
ClientSession session,
|
||||||
ClientSession managementSession) throws Exception {
|
ClientSession managementSession) throws Exception {
|
||||||
reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
|
reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
|
||||||
|
messageReader = new XMLMessageImporter(reader, session);
|
||||||
|
messageReader.setOldPrefixTranslation(oldPrefixTranslation);
|
||||||
|
|
||||||
this.session = session;
|
this.session = session;
|
||||||
if (managementSession != null) {
|
if (managementSession != null) {
|
||||||
this.managementSession = managementSession;
|
this.managementSession = managementSession;
|
||||||
|
@ -237,9 +232,9 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
|
|
||||||
private void processXml() throws Exception {
|
private void processXml() throws Exception {
|
||||||
if (sort) {
|
if (sort) {
|
||||||
messages = new TreeSet<MessageTemp>(new Comparator<MessageTemp>() {
|
messages = new TreeSet<XMLMessageImporter.MessageInfo>(new Comparator<XMLMessageImporter.MessageInfo>() {
|
||||||
@Override
|
@Override
|
||||||
public int compare(MessageTemp o1, MessageTemp o2) {
|
public int compare(XMLMessageImporter.MessageInfo o1, XMLMessageImporter.MessageInfo o2) {
|
||||||
if (o1.id == o2.id) {
|
if (o1.id == o2.id) {
|
||||||
return 0;
|
return 0;
|
||||||
} else if (o1.id > o2.id) {
|
} else if (o1.id > o2.id) {
|
||||||
|
@ -270,8 +265,8 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sort) {
|
if (sort) {
|
||||||
for (MessageTemp msgtmp : messages) {
|
for (XMLMessageImporter.MessageInfo msgtmp : messages) {
|
||||||
sendMessage(msgtmp.queues, msgtmp.message, msgtmp.tempFileName);
|
sendMessage(msgtmp.queues, msgtmp.message, msgtmp.tempFile);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -288,118 +283,14 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processMessage() throws Exception {
|
private void processMessage() throws Exception {
|
||||||
Byte type = 0;
|
XMLMessageImporter.MessageInfo info = messageReader.readMessage(false);
|
||||||
Byte priority = 0;
|
|
||||||
Long expiration = 0L;
|
|
||||||
Long timestamp = 0L;
|
|
||||||
Long id = 0L;
|
|
||||||
org.apache.activemq.artemis.utils.UUID userId = null;
|
|
||||||
ArrayList<String> queues = new ArrayList<>();
|
|
||||||
|
|
||||||
// get message's attributes
|
|
||||||
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
|
||||||
String attributeName = reader.getAttributeLocalName(i);
|
|
||||||
switch (attributeName) {
|
|
||||||
case XmlDataConstants.MESSAGE_TYPE:
|
|
||||||
type = getMessageType(reader.getAttributeValue(i));
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.MESSAGE_PRIORITY:
|
|
||||||
priority = Byte.parseByte(reader.getAttributeValue(i));
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.MESSAGE_EXPIRATION:
|
|
||||||
expiration = Long.parseLong(reader.getAttributeValue(i));
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.MESSAGE_TIMESTAMP:
|
|
||||||
timestamp = Long.parseLong(reader.getAttributeValue(i));
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.MESSAGE_USER_ID:
|
|
||||||
userId = UUIDGenerator.getInstance().generateUUID();
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.MESSAGE_ID:
|
|
||||||
id = Long.parseLong(reader.getAttributeValue(i));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Message message = session.createMessage(type, true, expiration, timestamp, priority);
|
|
||||||
message.setUserID(userId);
|
|
||||||
|
|
||||||
boolean endLoop = false;
|
|
||||||
|
|
||||||
File largeMessageTemporaryFile = null;
|
|
||||||
// loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.)
|
|
||||||
while (reader.hasNext()) {
|
|
||||||
int eventType = reader.getEventType();
|
|
||||||
switch (eventType) {
|
|
||||||
case XMLStreamConstants.START_ELEMENT:
|
|
||||||
if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) {
|
|
||||||
largeMessageTemporaryFile = processMessageBody(message.toCore());
|
|
||||||
} else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) {
|
|
||||||
processMessageProperties(message);
|
|
||||||
} else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) {
|
|
||||||
processMessageQueues(queues);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case XMLStreamConstants.END_ELEMENT:
|
|
||||||
if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) {
|
|
||||||
endLoop = true;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (endLoop) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
reader.next();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sort) {
|
if (sort) {
|
||||||
messages.add(new MessageTemp(id, queues, message, largeMessageTemporaryFile));
|
messages.add(info);
|
||||||
} else {
|
} else {
|
||||||
sendMessage(queues, message, largeMessageTemporaryFile);
|
sendMessage(info.queues, info.message, info.tempFile);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class MessageTemp {
|
|
||||||
long id;
|
|
||||||
List<String> queues;
|
|
||||||
Message message;
|
|
||||||
File tempFileName;
|
|
||||||
|
|
||||||
MessageTemp(long id, List<String> queues, Message message, File tempFileName) {
|
|
||||||
this.message = message;
|
|
||||||
this.queues = queues;
|
|
||||||
this.message = message;
|
|
||||||
this.id = id;
|
|
||||||
this.tempFileName = tempFileName;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Byte getMessageType(String value) {
|
|
||||||
Byte type = Message.DEFAULT_TYPE;
|
|
||||||
switch (value) {
|
|
||||||
case XmlDataConstants.DEFAULT_TYPE_PRETTY:
|
|
||||||
type = Message.DEFAULT_TYPE;
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.BYTES_TYPE_PRETTY:
|
|
||||||
type = Message.BYTES_TYPE;
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.MAP_TYPE_PRETTY:
|
|
||||||
type = Message.MAP_TYPE;
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.OBJECT_TYPE_PRETTY:
|
|
||||||
type = Message.OBJECT_TYPE;
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.STREAM_TYPE_PRETTY:
|
|
||||||
type = Message.STREAM_TYPE;
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.TEXT_TYPE_PRETTY:
|
|
||||||
type = Message.TEXT_TYPE;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
return type;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendMessage(List<String> queues, Message message, File tempFileName) throws Exception {
|
private void sendMessage(List<String> queues, Message message, File tempFileName) throws Exception {
|
||||||
StringBuilder logMessage = new StringBuilder();
|
StringBuilder logMessage = new StringBuilder();
|
||||||
String destination = addressMap.get(queues.get(0));
|
String destination = addressMap.get(queues.get(0));
|
||||||
|
@ -460,153 +351,6 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processMessageQueues(ArrayList<String> queues) {
|
|
||||||
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
|
||||||
if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i))) {
|
|
||||||
String queueName = reader.getAttributeValue(i);
|
|
||||||
String translation = checkPrefix(queueName);
|
|
||||||
queues.add(translation);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private String checkPrefix(String queueName) {
|
|
||||||
String newQueueName = oldPrefixTranslation.get(queueName);
|
|
||||||
if (newQueueName == null) {
|
|
||||||
newQueueName = queueName;
|
|
||||||
}
|
|
||||||
return newQueueName;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void processMessageProperties(Message message) {
|
|
||||||
String key = "";
|
|
||||||
String value = "";
|
|
||||||
String propertyType = "";
|
|
||||||
|
|
||||||
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
|
||||||
String attributeName = reader.getAttributeLocalName(i);
|
|
||||||
switch (attributeName) {
|
|
||||||
case XmlDataConstants.PROPERTY_NAME:
|
|
||||||
key = reader.getAttributeValue(i);
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.PROPERTY_VALUE:
|
|
||||||
value = reader.getAttributeValue(i);
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.PROPERTY_TYPE:
|
|
||||||
propertyType = reader.getAttributeValue(i);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (value.equals(XmlDataConstants.NULL)) {
|
|
||||||
value = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (propertyType) {
|
|
||||||
case XmlDataConstants.PROPERTY_TYPE_SHORT:
|
|
||||||
message.putShortProperty(key, Short.parseShort(value));
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.PROPERTY_TYPE_BOOLEAN:
|
|
||||||
message.putBooleanProperty(key, Boolean.parseBoolean(value));
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.PROPERTY_TYPE_BYTE:
|
|
||||||
message.putByteProperty(key, Byte.parseByte(value));
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.PROPERTY_TYPE_BYTES:
|
|
||||||
message.putBytesProperty(key, value == null ? null : decode(value));
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.PROPERTY_TYPE_DOUBLE:
|
|
||||||
message.putDoubleProperty(key, Double.parseDouble(value));
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.PROPERTY_TYPE_FLOAT:
|
|
||||||
message.putFloatProperty(key, Float.parseFloat(value));
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.PROPERTY_TYPE_INTEGER:
|
|
||||||
message.putIntProperty(key, Integer.parseInt(value));
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.PROPERTY_TYPE_LONG:
|
|
||||||
message.putLongProperty(key, Long.parseLong(value));
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING:
|
|
||||||
message.putStringProperty(new SimpleString(key), value == null ? null : SimpleString.toSimpleString(value));
|
|
||||||
break;
|
|
||||||
case XmlDataConstants.PROPERTY_TYPE_STRING:
|
|
||||||
message.putStringProperty(key, value);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private File processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException {
|
|
||||||
File tempFileName = null;
|
|
||||||
boolean isLarge = false;
|
|
||||||
|
|
||||||
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
|
||||||
String attributeName = reader.getAttributeLocalName(i);
|
|
||||||
if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName)) {
|
|
||||||
isLarge = Boolean.parseBoolean(reader.getAttributeValue(i));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
reader.next();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("XMLStreamReader impl: " + reader);
|
|
||||||
}
|
|
||||||
if (isLarge) {
|
|
||||||
tempFileName = File.createTempFile("largeMessage", ".tmp");
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("Creating temp file " + tempFileName + " for large message.");
|
|
||||||
}
|
|
||||||
try (OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFileName))) {
|
|
||||||
getMessageBodyBytes(new MessageBodyBytesProcessor() {
|
|
||||||
@Override
|
|
||||||
public void processBodyBytes(byte[] bytes) throws IOException {
|
|
||||||
out.write(bytes);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
FileInputStream fileInputStream = new FileInputStream(tempFileName);
|
|
||||||
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
|
|
||||||
((ClientMessage) message).setBodyInputStream(bufferedInput);
|
|
||||||
} else {
|
|
||||||
getMessageBodyBytes(new MessageBodyBytesProcessor() {
|
|
||||||
@Override
|
|
||||||
public void processBodyBytes(byte[] bytes) throws IOException {
|
|
||||||
message.getBodyBuffer().writeBytes(bytes);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
return tempFileName;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Message bodies are written to XML as one or more Base64 encoded CDATA elements. Some parser implementations won't
|
|
||||||
* read an entire CDATA element at once (e.g. Woodstox) so it's possible that multiple CDATA/CHARACTERS events need
|
|
||||||
* to be combined to reconstruct the Base64 encoded string. You can't decode bits and pieces of each CDATA. Each
|
|
||||||
* CDATA has to be decoded in its entirety.
|
|
||||||
*
|
|
||||||
* @param processor used to deal with the decoded CDATA elements
|
|
||||||
*/
|
|
||||||
private void getMessageBodyBytes(MessageBodyBytesProcessor processor) throws IOException, XMLStreamException {
|
|
||||||
int currentEventType;
|
|
||||||
StringBuilder cdata = new StringBuilder();
|
|
||||||
while (reader.hasNext()) {
|
|
||||||
currentEventType = reader.getEventType();
|
|
||||||
if (currentEventType == XMLStreamConstants.END_ELEMENT) {
|
|
||||||
break;
|
|
||||||
} else if (currentEventType == XMLStreamConstants.CHARACTERS && reader.isWhiteSpace() && cdata.length() > 0) {
|
|
||||||
/* when we hit a whitespace CHARACTERS event we know that the entire CDATA is complete so decode, pass back to
|
|
||||||
* the processor, and reset the cdata for the next event(s)
|
|
||||||
*/
|
|
||||||
processor.processBodyBytes(decode(cdata.toString()));
|
|
||||||
cdata.setLength(0);
|
|
||||||
} else {
|
|
||||||
cdata.append(new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()).trim());
|
|
||||||
}
|
|
||||||
reader.next();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private void oldBinding() throws Exception {
|
private void oldBinding() throws Exception {
|
||||||
String queueName = "";
|
String queueName = "";
|
||||||
String address = "";
|
String address = "";
|
||||||
|
@ -762,11 +506,5 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static byte[] decode(String data) {
|
|
||||||
return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
|
|
||||||
}
|
|
||||||
|
|
||||||
private interface MessageBodyBytesProcessor {
|
|
||||||
void processBodyBytes(byte[] bytes) throws IOException;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.cli.factory.serialize;
|
||||||
|
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
|
||||||
|
public interface MessageSerializer {
|
||||||
|
|
||||||
|
Message read() throws Exception;
|
||||||
|
|
||||||
|
void write(Message message);
|
||||||
|
|
||||||
|
void setOutput(OutputStream out) throws Exception;
|
||||||
|
|
||||||
|
void setInput(InputStream in, Session session) throws Exception;
|
||||||
|
|
||||||
|
void start() throws Exception;
|
||||||
|
|
||||||
|
void stop() throws Exception;
|
||||||
|
}
|
|
@ -0,0 +1,118 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.cli.factory.serialize;
|
||||||
|
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.xml.stream.XMLInputFactory;
|
||||||
|
import javax.xml.stream.XMLOutputFactory;
|
||||||
|
import javax.xml.stream.XMLStreamReader;
|
||||||
|
import javax.xml.stream.XMLStreamWriter;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.lang.reflect.Proxy;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.cli.commands.tools.xml.XMLMessageExporter;
|
||||||
|
import org.apache.activemq.artemis.cli.commands.tools.xml.XMLMessageImporter;
|
||||||
|
import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataConstants;
|
||||||
|
import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataExporter;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
|
||||||
|
|
||||||
|
public class XMLMessageSerializer implements MessageSerializer {
|
||||||
|
|
||||||
|
private XMLMessageExporter writer;
|
||||||
|
|
||||||
|
private XMLMessageImporter reader;
|
||||||
|
|
||||||
|
private ClientSession clientSession;
|
||||||
|
|
||||||
|
private OutputStream out;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized Message read() throws Exception {
|
||||||
|
reader.getRawXMLReader().nextTag();
|
||||||
|
|
||||||
|
// End of document.
|
||||||
|
if (reader.getRawXMLReader().getLocalName().equals("messages")) return null;
|
||||||
|
|
||||||
|
XMLMessageImporter.MessageInfo messageInfo = reader.readMessage(true);
|
||||||
|
if (messageInfo == null) return null;
|
||||||
|
|
||||||
|
// This is a large message
|
||||||
|
ActiveMQMessage jmsMessage = new ActiveMQMessage((ClientMessage) messageInfo.message, clientSession);
|
||||||
|
if (messageInfo.tempFile != null) {
|
||||||
|
jmsMessage.setInputStream(new FileInputStream(messageInfo.tempFile));
|
||||||
|
}
|
||||||
|
return jmsMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void write(Message message) {
|
||||||
|
try {
|
||||||
|
ICoreMessage core = ((ActiveMQMessage) message).getCoreMessage();
|
||||||
|
writer.printSingleMessageAsXML(core, null, true);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setOutput(OutputStream outputStream) throws Exception {
|
||||||
|
this.out = outputStream;
|
||||||
|
XMLOutputFactory factory = XMLOutputFactory.newInstance();
|
||||||
|
XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(outputStream, "UTF-8");
|
||||||
|
XmlDataExporter.PrettyPrintHandler handler = new XmlDataExporter.PrettyPrintHandler(rawXmlWriter);
|
||||||
|
XMLStreamWriter xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler);
|
||||||
|
this.writer = new XMLMessageExporter(xmlWriter);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setInput(InputStream inputStream, Session session) throws Exception {
|
||||||
|
XMLStreamReader streamReader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
|
||||||
|
this.clientSession = ((ActiveMQSession) session).getCoreSession();
|
||||||
|
this.reader = new XMLMessageImporter(streamReader, clientSession);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void start() throws Exception {
|
||||||
|
if (writer != null) {
|
||||||
|
writer.getRawXMLWriter().writeStartDocument(XmlDataConstants.XML_VERSION);
|
||||||
|
writer.getRawXMLWriter().writeStartElement(XmlDataConstants.MESSAGES_PARENT);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (reader != null) {
|
||||||
|
// <messages>
|
||||||
|
reader.getRawXMLReader().nextTag();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void stop() throws Exception {
|
||||||
|
if (writer != null) {
|
||||||
|
writer.getRawXMLWriter().writeEndElement();
|
||||||
|
writer.getRawXMLWriter().writeEndDocument();
|
||||||
|
writer.getRawXMLWriter().flush();
|
||||||
|
writer.getRawXMLWriter().close();
|
||||||
|
out.flush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,362 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.cli.test;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.MapMessage;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.ObjectMessage;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
import javax.jms.Topic;
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.cli.Artemis;
|
||||||
|
import org.apache.activemq.artemis.cli.commands.Run;
|
||||||
|
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test to validate that the CLI doesn't throw improper exceptions when invoked.
|
||||||
|
*/
|
||||||
|
public class MessageSerializerTest extends CliTestBase {
|
||||||
|
|
||||||
|
private Connection connection;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
@Override
|
||||||
|
public void setup() throws Exception {
|
||||||
|
setupAuth();
|
||||||
|
super.setup();
|
||||||
|
startServer();
|
||||||
|
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||||
|
connection = cf.createConnection("admin", "admin");
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
@Override
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
try {
|
||||||
|
connection.close();
|
||||||
|
} finally {
|
||||||
|
stopServer();
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupAuth() throws Exception {
|
||||||
|
setupAuth(temporaryFolder.getRoot());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupAuth(File folder) throws Exception {
|
||||||
|
System.setProperty("java.security.auth.login.config", folder.getAbsolutePath() + "/etc/login.config");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startServer() throws Exception {
|
||||||
|
File rootDirectory = new File(temporaryFolder.getRoot(), "broker");
|
||||||
|
setupAuth(rootDirectory);
|
||||||
|
Run.setEmbedded(true);
|
||||||
|
Artemis.main("create", rootDirectory.getAbsolutePath(), "--silent", "--no-fsync", "--no-autotune", "--no-web", "--require-login");
|
||||||
|
System.setProperty("artemis.instance", rootDirectory.getAbsolutePath());
|
||||||
|
Artemis.internalExecute("run");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void stopServer() throws Exception {
|
||||||
|
Artemis.internalExecute("stop");
|
||||||
|
assertTrue(Run.latchRunning.await(5, TimeUnit.SECONDS));
|
||||||
|
assertEquals(0, LibaioContext.getTotalMaxIO());
|
||||||
|
}
|
||||||
|
|
||||||
|
private File createMessageFile() throws IOException {
|
||||||
|
return temporaryFolder.newFile("messages.xml");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTextMessageImportExport() throws Exception {
|
||||||
|
String address = "test";
|
||||||
|
int noMessages = 10;
|
||||||
|
File file = createMessageFile();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
List<Message> sent = new ArrayList<>(noMessages);
|
||||||
|
for (int i = 0; i < noMessages; i++) {
|
||||||
|
sent.add(session.createTextMessage(RandomUtil.randomString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
sendMessages(session, address, sent);
|
||||||
|
exportMessages(address, noMessages, file);
|
||||||
|
|
||||||
|
// Ensure there's nothing left to consume
|
||||||
|
MessageConsumer consumer = session.createConsumer(getDestination(address));
|
||||||
|
assertNull(consumer.receive(1000));
|
||||||
|
consumer.close();
|
||||||
|
|
||||||
|
importMessages(address, file);
|
||||||
|
|
||||||
|
List<Message> received = consumeMessages(session, address, noMessages, false);
|
||||||
|
for (int i = 0; i < noMessages; i++) {
|
||||||
|
assertEquals(((TextMessage) sent.get(i)).getText(), ((TextMessage) received.get(i)).getText());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testObjectMessageImportExport() throws Exception {
|
||||||
|
String address = "test";
|
||||||
|
int noMessages = 10;
|
||||||
|
File file = createMessageFile();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
// Send initial messages.
|
||||||
|
List<Message> sent = new ArrayList<>(noMessages);
|
||||||
|
for (int i = 0; i < noMessages; i++) {
|
||||||
|
sent.add(session.createObjectMessage(UUID.randomUUID()));
|
||||||
|
}
|
||||||
|
|
||||||
|
sendMessages(session, address, sent);
|
||||||
|
exportMessages(address, noMessages, file);
|
||||||
|
|
||||||
|
// Ensure there's nothing left to consume
|
||||||
|
MessageConsumer consumer = session.createConsumer(getDestination(address));
|
||||||
|
assertNull(consumer.receive(1000));
|
||||||
|
consumer.close();
|
||||||
|
|
||||||
|
importMessages(address, file);
|
||||||
|
List<Message> received = consumeMessages(session, address, noMessages, false);
|
||||||
|
for (int i = 0; i < noMessages; i++) {
|
||||||
|
assertEquals(((ObjectMessage) sent.get(i)).getObject(), ((ObjectMessage) received.get(i)).getObject());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMapMessageImportExport() throws Exception {
|
||||||
|
String address = "test";
|
||||||
|
int noMessages = 10;
|
||||||
|
String key = "testKey";
|
||||||
|
File file = createMessageFile();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
List<Message> sent = new ArrayList<>(noMessages);
|
||||||
|
for (int i = 0; i < noMessages; i++) {
|
||||||
|
MapMessage m = session.createMapMessage();
|
||||||
|
m.setString(key, RandomUtil.randomString());
|
||||||
|
sent.add(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
sendMessages(session, address, sent);
|
||||||
|
exportMessages(address, noMessages, file);
|
||||||
|
|
||||||
|
// Ensure there's nothing left to consume
|
||||||
|
MessageConsumer consumer = session.createConsumer(getDestination(address));
|
||||||
|
assertNull(consumer.receive(1000));
|
||||||
|
consumer.close();
|
||||||
|
|
||||||
|
importMessages(address, file);
|
||||||
|
List<Message> received = consumeMessages(session, address, noMessages, false);
|
||||||
|
for (int i = 0; i < noMessages; i++) {
|
||||||
|
assertEquals(((MapMessage) sent.get(i)).getString(key), ((MapMessage) received.get(i)).getString(key));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendMessages(Session session, String address, List<Message> messages) throws Exception {
|
||||||
|
MessageProducer producer = session.createProducer(getDestination(address));
|
||||||
|
for (Message m : messages) {
|
||||||
|
producer.send(m);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendMessages(Session session, Destination destination, List<Message> messages) throws Exception {
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
for (Message m : messages) {
|
||||||
|
producer.send(m);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Message> consumeMessages(Session session, String address, int noMessages, boolean fqqn) throws Exception {
|
||||||
|
Destination destination = fqqn ? session.createQueue(address) : getDestination(address);
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
|
List<Message> messages = new ArrayList<>();
|
||||||
|
for (int i = 0; i < noMessages; i++) {
|
||||||
|
Message m = consumer.receive(1000);
|
||||||
|
assertNotNull(m);
|
||||||
|
messages.add(m);
|
||||||
|
}
|
||||||
|
return messages;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void exportMessages(String address, int noMessages, File output) throws Exception {
|
||||||
|
Artemis.main("consumer",
|
||||||
|
"--user", "admin",
|
||||||
|
"--password", "admin",
|
||||||
|
"--destination", address,
|
||||||
|
"--message-count", "" + noMessages,
|
||||||
|
"--data", output.getAbsolutePath());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void importMessages(String address, File input) throws Exception {
|
||||||
|
Artemis.main("producer",
|
||||||
|
"--user", "admin",
|
||||||
|
"--password", "admin",
|
||||||
|
"--destination", address,
|
||||||
|
"--data", input.getAbsolutePath());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createQueue(String routingTypeOption, String address, String queueName) throws Exception {
|
||||||
|
Artemis.main("queue", "create",
|
||||||
|
"--user", "admin",
|
||||||
|
"--password", "admin",
|
||||||
|
"--address", address,
|
||||||
|
"--name", queueName,
|
||||||
|
routingTypeOption,
|
||||||
|
"--durable",
|
||||||
|
"--preserve-on-no-consumers",
|
||||||
|
"--auto-create-address");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendDirectToQueue() throws Exception {
|
||||||
|
|
||||||
|
String address = "test";
|
||||||
|
String queue1Name = "queue1";
|
||||||
|
String queue2Name = "queue2";
|
||||||
|
|
||||||
|
createQueue("--multicast", address, queue1Name);
|
||||||
|
createQueue("--multicast", address, queue2Name);
|
||||||
|
|
||||||
|
try (ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = cf.createConnection("admin", "admin");) {
|
||||||
|
|
||||||
|
// send messages to queue
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Destination queue1 = session.createQueue(address + "::" + queue1Name);
|
||||||
|
Destination queue2 = session.createQueue(address + "::" + queue2Name);
|
||||||
|
|
||||||
|
MessageConsumer consumer1 = session.createConsumer(queue1);
|
||||||
|
MessageConsumer consumer2 = session.createConsumer(queue2);
|
||||||
|
|
||||||
|
Artemis.main("producer",
|
||||||
|
"--user", "admin",
|
||||||
|
"--password", "admin",
|
||||||
|
"--destination", "fqqn://" + address + "::" + queue1Name,
|
||||||
|
"--message-count", "5");
|
||||||
|
|
||||||
|
assertNull(consumer2.receive(1000));
|
||||||
|
assertNotNull(consumer1.receive(1000));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void exportFromFQQN() throws Exception {
|
||||||
|
String addr = "address";
|
||||||
|
String queue = "queue";
|
||||||
|
String fqqn = addr + "::" + queue;
|
||||||
|
String destination = "fqqn://" + fqqn;
|
||||||
|
|
||||||
|
File file = createMessageFile();
|
||||||
|
int noMessages = 10;
|
||||||
|
|
||||||
|
createQueue("--multicast", addr, queue);
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Topic topic = session.createTopic(addr);
|
||||||
|
|
||||||
|
List<Message> messages = new ArrayList<>(noMessages);
|
||||||
|
for (int i = 0; i < noMessages; i++) {
|
||||||
|
messages.add(session.createTextMessage(RandomUtil.randomString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
sendMessages(session, topic, messages);
|
||||||
|
|
||||||
|
exportMessages(destination, noMessages, file);
|
||||||
|
importMessages(destination, file);
|
||||||
|
|
||||||
|
List<Message> recieved = consumeMessages(session, fqqn, noMessages, true);
|
||||||
|
for (int i = 0; i < noMessages; i++) {
|
||||||
|
assertEquals(((TextMessage) messages.get(i)).getText(), ((TextMessage) recieved.get(i)).getText());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//read individual lines from byteStream
|
||||||
|
private ArrayList<String> getOutputLines(TestActionContext context, boolean errorOutput) throws IOException {
|
||||||
|
byte[] bytes;
|
||||||
|
|
||||||
|
if (errorOutput) {
|
||||||
|
bytes = context.getStdErrBytes();
|
||||||
|
} else {
|
||||||
|
bytes = context.getStdoutBytes();
|
||||||
|
}
|
||||||
|
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(bytes)));
|
||||||
|
ArrayList<String> lines = new ArrayList<>();
|
||||||
|
|
||||||
|
String currentLine = bufferedReader.readLine();
|
||||||
|
while (currentLine != null) {
|
||||||
|
lines.add(currentLine);
|
||||||
|
currentLine = bufferedReader.readLine();
|
||||||
|
}
|
||||||
|
|
||||||
|
return lines;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendMessages(Session session, String queueName, int messageCount) throws JMSException {
|
||||||
|
MessageProducer producer = session.createProducer(getDestination(queueName));
|
||||||
|
|
||||||
|
TextMessage message = session.createTextMessage(getTestMessageBody());
|
||||||
|
|
||||||
|
for (int i = 0; i < messageCount; i++) {
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getTestMessageBody() {
|
||||||
|
return "Sample Message";
|
||||||
|
}
|
||||||
|
|
||||||
|
private Destination getDestination(String queueName) {
|
||||||
|
return ActiveMQDestination.createDestination("queue://" + queueName, ActiveMQDestination.TYPE.QUEUE);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue