From 812776fca7ab7e029bdc5b46da87325d597fd723 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Wed, 2 May 2018 11:32:39 +0100 Subject: [PATCH 1/2] ARTEMIS-1840 Refactor XML Data Serialiser --- .../tools/xml/XMLMessageExporter.java | 148 ++++++++ .../tools/xml/XMLMessageImporter.java | 331 ++++++++++++++++++ .../commands/tools/xml/XmlDataConstants.java | 4 +- .../commands/tools/xml/XmlDataExporter.java | 117 +------ .../tools/xml/XmlDataExporterUtil.java | 2 +- .../commands/tools/xml/XmlDataImporter.java | 288 +-------------- 6 files changed, 507 insertions(+), 383 deletions(-) create mode 100644 artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java create mode 100644 artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageImporter.java diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java new file mode 100644 index 0000000000..a2fbaddef8 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.cli.commands.tools.xml; + +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamWriter; +import java.util.List; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.reader.TextMessageUtil; + + +/** This is an Utility class that will import the outputs in XML format. */ +public class XMLMessageExporter { + + private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L; + + private XMLStreamWriter xmlWriter; + + public XMLMessageExporter(XMLStreamWriter xmlWriter) { + this.xmlWriter = xmlWriter; + } + + public XMLStreamWriter getRawXMLWriter() { + return xmlWriter; + } + + public void printSingleMessageAsXML(ICoreMessage message, List queues, boolean encodeTextUTF8) throws Exception { + xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD); + printMessageAttributes(message); + printMessageProperties(message); + printMessageQueues(queues); + printMessageBody(message.toCore(), encodeTextUTF8); + xmlWriter.writeEndElement(); // end MESSAGES_CHILD + } + + public void printMessageBody(Message message, boolean encodeTextMessageUTF8) throws Exception { + xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY); + + if (message.isLargeMessage()) { + printLargeMessageBody((LargeServerMessage) message); + } else { + if (encodeTextMessageUTF8 && message.toCore().getType() == Message.TEXT_TYPE) { + xmlWriter.writeCData(TextMessageUtil.readBodyText(message.toCore().getReadOnlyBodyBuffer()).toString()); + } else { + xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBodyBase64(message)); + } + } + xmlWriter.writeEndElement(); // end MESSAGE_BODY + } + + public void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException { + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString()); + LargeBodyEncoder encoder = null; + + try { + encoder = message.toCore().getBodyEncoder(); + encoder.open(); + long totalBytesWritten = 0; + Long bufferSize; + long bodySize = encoder.getLargeBodySize(); + for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) { + Long remainder = bodySize - totalBytesWritten; + if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) { + bufferSize = LARGE_MESSAGE_CHUNK_SIZE; + } else { + bufferSize = remainder; + } + ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue()); + encoder.encode(buffer, bufferSize.intValue()); + xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array())); + totalBytesWritten += bufferSize; + } + encoder.close(); + } catch (ActiveMQException e) { + e.printStackTrace(); + } finally { + if (encoder != null) { + try { + encoder.close(); + } catch (ActiveMQException e) { + e.printStackTrace(); + } + } + } + } + + public void printMessageQueues(List queues) throws XMLStreamException { + if (queues != null) { + xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT); + for (String queueName : queues) { + xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD); + xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName); + } + xmlWriter.writeEndElement(); // end QUEUES_PARENT + } + } + + public void printMessageProperties(Message message) throws XMLStreamException { + xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT); + for (SimpleString key : message.getPropertyNames()) { + Object value = message.getObjectProperty(key); + xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD); + xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString()); + xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, XmlDataExporterUtil.convertProperty(value)); + + // Write the property type as an attribute + String propertyType = XmlDataExporterUtil.getPropertyType(value); + if (propertyType != null) { + xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, propertyType); + } + } + xmlWriter.writeEndElement(); // end PROPERTIES_PARENT + } + + public void printMessageAttributes(ICoreMessage message) throws XMLStreamException { + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID())); + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority())); + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration())); + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp())); + String prettyType = XmlDataExporterUtil.getMessagePrettyType(message.getType()); + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType); + if (message.getUserID() != null) { + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString()); + } + } +} diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageImporter.java new file mode 100644 index 0000000000..09e78d5657 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageImporter.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.cli.commands.tools.xml; + +import javax.xml.stream.XMLStreamConstants; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.utils.Base64; +import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.jboss.logging.Logger; + +/** This is an Utility class that will import the outputs in XML format. */ +public class XMLMessageImporter { + + private static final Logger logger = Logger.getLogger(XMLMessageImporter.class); + + private XMLStreamReader reader; + + private ClientSession session; + + Map oldPrefixTranslation = new HashMap<>(); + + public XMLMessageImporter(XMLStreamReader xmlStreamReader, ClientSession session) { + this.reader = xmlStreamReader; + this.session = session; + } + + public void setOldPrefixTranslation(Map oldPrefixTranslation) { + this.oldPrefixTranslation = oldPrefixTranslation; + } + + public XMLStreamReader getRawXMLReader() { + return reader; + } + + public MessageInfo readMessage(boolean decodeUTF8) throws Exception { + if (!reader.hasNext()) return null; + + Byte type = 0; + Byte priority = 0; + Long expiration = 0L; + Long timestamp = 0L; + Long id = 0L; + org.apache.activemq.artemis.utils.UUID userId = null; + ArrayList queues = new ArrayList<>(); + + // get message's attributes + for (int i = 0; i < reader.getAttributeCount(); i++) { + String attributeName = reader.getAttributeLocalName(i); + switch (attributeName) { + case XmlDataConstants.MESSAGE_TYPE: + type = getMessageType(reader.getAttributeValue(i)); + break; + case XmlDataConstants.MESSAGE_PRIORITY: + priority = Byte.parseByte(reader.getAttributeValue(i)); + break; + case XmlDataConstants.MESSAGE_EXPIRATION: + expiration = Long.parseLong(reader.getAttributeValue(i)); + break; + case XmlDataConstants.MESSAGE_TIMESTAMP: + timestamp = Long.parseLong(reader.getAttributeValue(i)); + break; + case XmlDataConstants.MESSAGE_USER_ID: + userId = UUIDGenerator.getInstance().generateUUID(); + break; + case XmlDataConstants.MESSAGE_ID: + id = Long.parseLong(reader.getAttributeValue(i)); + break; + } + } + + Message message = session.createMessage(type, true, expiration, timestamp, priority); + + message.setUserID(userId); + + boolean endLoop = false; + + File largeMessageTemporaryFile = null; + // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.) + while (reader.hasNext()) { + int eventType = reader.getEventType(); + switch (eventType) { + case XMLStreamConstants.START_ELEMENT: + if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) { + largeMessageTemporaryFile = processMessageBody(message.toCore(), decodeUTF8); + } else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) { + processMessageProperties(message); + } else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) { + processMessageQueues(queues); + } + break; + case XMLStreamConstants.END_ELEMENT: + if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) { + endLoop = true; + } + break; + } + if (endLoop) { + break; + } + reader.next(); + } + return new MessageInfo(id, queues, message, largeMessageTemporaryFile); + } + + private Byte getMessageType(String value) { + Byte type = Message.DEFAULT_TYPE; + switch (value) { + case XmlDataConstants.DEFAULT_TYPE_PRETTY: + type = Message.DEFAULT_TYPE; + break; + case XmlDataConstants.BYTES_TYPE_PRETTY: + type = Message.BYTES_TYPE; + break; + case XmlDataConstants.MAP_TYPE_PRETTY: + type = Message.MAP_TYPE; + break; + case XmlDataConstants.OBJECT_TYPE_PRETTY: + type = Message.OBJECT_TYPE; + break; + case XmlDataConstants.STREAM_TYPE_PRETTY: + type = Message.STREAM_TYPE; + break; + case XmlDataConstants.TEXT_TYPE_PRETTY: + type = Message.TEXT_TYPE; + break; + } + return type; + } + + private void processMessageQueues(ArrayList queues) { + for (int i = 0; i < reader.getAttributeCount(); i++) { + if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i))) { + String queueName = reader.getAttributeValue(i); + String translation = checkPrefix(queueName); + queues.add(translation); + } + } + } + + private String checkPrefix(String queueName) { + String newQueueName = oldPrefixTranslation.get(queueName); + if (newQueueName == null) { + newQueueName = queueName; + } + return newQueueName; + } + + private void processMessageProperties(Message message) { + String key = ""; + String value = ""; + String propertyType = ""; + + for (int i = 0; i < reader.getAttributeCount(); i++) { + String attributeName = reader.getAttributeLocalName(i); + switch (attributeName) { + case XmlDataConstants.PROPERTY_NAME: + key = reader.getAttributeValue(i); + break; + case XmlDataConstants.PROPERTY_VALUE: + value = reader.getAttributeValue(i); + break; + case XmlDataConstants.PROPERTY_TYPE: + propertyType = reader.getAttributeValue(i); + break; + } + } + + if (value.equals(XmlDataConstants.NULL)) { + value = null; + } + + switch (propertyType) { + case XmlDataConstants.PROPERTY_TYPE_SHORT: + message.putShortProperty(key, Short.parseShort(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_BOOLEAN: + message.putBooleanProperty(key, Boolean.parseBoolean(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_BYTE: + message.putByteProperty(key, Byte.parseByte(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_BYTES: + message.putBytesProperty(key, value == null ? null : decode(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_DOUBLE: + message.putDoubleProperty(key, Double.parseDouble(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_FLOAT: + message.putFloatProperty(key, Float.parseFloat(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_INTEGER: + message.putIntProperty(key, Integer.parseInt(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_LONG: + message.putLongProperty(key, Long.parseLong(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING: + message.putStringProperty(new SimpleString(key), value == null ? null : SimpleString.toSimpleString(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_STRING: + message.putStringProperty(key, value); + break; + } + } + + private File processMessageBody(final ICoreMessage message, boolean decodeTextMessage) throws XMLStreamException, IOException { + File tempFileName = null; + boolean isLarge = false; + + for (int i = 0; i < reader.getAttributeCount(); i++) { + String attributeName = reader.getAttributeLocalName(i); + if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName)) { + isLarge = Boolean.parseBoolean(reader.getAttributeValue(i)); + } + } + reader.next(); + if (logger.isDebugEnabled()) { + logger.debug("XMLStreamReader impl: " + reader); + } + if (isLarge) { + tempFileName = File.createTempFile("largeMessage", ".tmp"); + if (logger.isDebugEnabled()) { + logger.debug("Creating temp file " + tempFileName + " for large message."); + } + try (OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFileName))) { + getMessageBodyBytes(bytes -> out.write(bytes), (message.toCore().getType() == Message.TEXT_TYPE) && decodeTextMessage); + } + FileInputStream fileInputStream = new FileInputStream(tempFileName); + BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream); + ((ClientMessage) message).setBodyInputStream(bufferedInput); + } else { + getMessageBodyBytes(bytes -> message.getBodyBuffer().writeBytes(bytes), (message.toCore().getType() == Message.TEXT_TYPE) && decodeTextMessage); + } + + return tempFileName; + } + + /** + * Message bodies are written to XML as one or more Base64 encoded CDATA elements. Some parser implementations won't + * read an entire CDATA element at once (e.g. Woodstox) so it's possible that multiple CDATA/CHARACTERS events need + * to be combined to reconstruct the Base64 encoded string. You can't decode bits and pieces of each CDATA. Each + * CDATA has to be decoded in its entirety. + * + * @param processor used to deal with the decoded CDATA elements + * @param textMessage If this a text message we decode UTF8 and encode as a simple string + */ + private void getMessageBodyBytes(MessageBodyBytesProcessor processor, boolean decodeTextMessage) throws IOException, XMLStreamException { + int currentEventType; + StringBuilder cdata = new StringBuilder(); + while (reader.hasNext()) { + currentEventType = reader.getEventType(); + if (currentEventType == XMLStreamConstants.END_ELEMENT) { + break; + } else if (currentEventType == XMLStreamConstants.CHARACTERS && reader.isWhiteSpace() && cdata.length() > 0) { + /* when we hit a whitespace CHARACTERS event we know that the entire CDATA is complete so decode, pass back to + * the processor, and reset the cdata for the next event(s) + */ + if (decodeTextMessage) { + SimpleString text = new SimpleString(cdata.toString()); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(SimpleString.sizeofNullableString(text)); + SimpleString.writeNullableSimpleString(byteBuf, text); + byte[] bytes = new byte[SimpleString.sizeofNullableString(text)]; + byteBuf.readBytes(bytes); + processor.processBodyBytes(bytes); + } else { + processor.processBodyBytes(decode(cdata.toString())); + cdata.setLength(0); + } + } else { + cdata.append(new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()).trim()); + } + reader.next(); + } + } + + private static byte[] decode(String data) { + return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); + } + + private interface MessageBodyBytesProcessor { + void processBodyBytes(byte[] bytes) throws IOException; + } + + public class MessageInfo { + public long id; + public List queues; + public Message message; + public File tempFile; + + MessageInfo(long id, List queues, Message message, File tempFile) { + this.message = message; + this.queues = queues; + this.id = id; + this.tempFile = tempFile; + } + } +} diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java index 24e56b2e58..a5a7c97d36 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java @@ -26,7 +26,7 @@ public final class XmlDataConstants { // Utility } - static final String XML_VERSION = "1.0"; + public static final String XML_VERSION = "1.0"; static final String DOCUMENT_PARENT = "activemq-journal"; static final String BINDINGS_PARENT = "bindings"; @@ -50,7 +50,7 @@ public final class XmlDataConstants { static final String ADDRESS_BINDING_ID = "id"; static final String ADDRESS_BINDING_ROUTING_TYPE = "routing-types"; - static final String MESSAGES_PARENT = "messages"; + public static final String MESSAGES_PARENT = "messages"; static final String MESSAGES_CHILD = "message"; static final String MESSAGE_ID = "id"; static final String MESSAGE_PRIORITY = "priority"; diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java index 85eac4a940..0125cb75a5 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java @@ -36,7 +36,6 @@ import java.util.TreeMap; import io.airlift.airline.Command; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; @@ -48,7 +47,6 @@ import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; -import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; @@ -66,12 +64,10 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.Persisten import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.JournalType; -import org.apache.activemq.artemis.core.server.LargeServerMessage; @Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.") public final class XmlDataExporter extends DBOption { - private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L; private XMLStreamWriter xmlWriter; // an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID @@ -92,6 +88,8 @@ public final class XmlDataExporter extends DBOption { long bindingsPrinted = 0L; + XMLMessageExporter exporter; + @Override public Object execute(ActionContext context) throws Exception { super.execute(context); @@ -141,7 +139,7 @@ public final class XmlDataExporter extends DBOption { XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8"); PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter); xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler); - + exporter = new XMLMessageExporter(xmlWriter); writeXMLData(); } @@ -317,7 +315,7 @@ public final class XmlDataExporter extends DBOption { private void printDataAsXML() { try { - xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION); + xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT); printBindingsAsXML(); printAllMessagesAsXML(); @@ -375,6 +373,10 @@ public final class XmlDataExporter extends DBOption { xmlWriter.writeEndElement(); // end "messages" } + private void printSingleMessageAsXML(ICoreMessage message, List queues) throws Exception { + exporter.printSingleMessageAsXML(message, queues, false); + messagesPrinted++; + } /** * Reads from the page files and prints messages as it finds them (making sure to check acks and transactions * from the journal). @@ -444,104 +446,9 @@ public final class XmlDataExporter extends DBOption { } } - private void printSingleMessageAsXML(ICoreMessage message, List queues) throws Exception { - xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD); - printMessageAttributes(message); - printMessageProperties(message); - printMessageQueues(queues); - printMessageBody(message.toCore()); - xmlWriter.writeEndElement(); // end MESSAGES_CHILD - messagesPrinted++; - } - - private void printMessageBody(Message message) throws Exception { - xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY); - - if (message.toCore().isLargeMessage()) { - printLargeMessageBody((LargeServerMessage) message); - } else { - xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBody(message)); - } - xmlWriter.writeEndElement(); // end MESSAGE_BODY - } - - private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException { - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString()); - LargeBodyEncoder encoder = null; - - try { - encoder = message.toCore().getBodyEncoder(); - encoder.open(); - long totalBytesWritten = 0; - Long bufferSize; - long bodySize = encoder.getLargeBodySize(); - for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) { - Long remainder = bodySize - totalBytesWritten; - if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) { - bufferSize = LARGE_MESSAGE_CHUNK_SIZE; - } else { - bufferSize = remainder; - } - ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue()); - encoder.encode(buffer, bufferSize.intValue()); - xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array())); - totalBytesWritten += bufferSize; - } - encoder.close(); - } catch (ActiveMQException e) { - e.printStackTrace(); - } finally { - if (encoder != null) { - try { - encoder.close(); - } catch (ActiveMQException e) { - e.printStackTrace(); - } - } - } - } - - private void printMessageQueues(List queues) throws XMLStreamException { - xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT); - for (String queueName : queues) { - xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD); - xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName); - } - xmlWriter.writeEndElement(); // end QUEUES_PARENT - } - - private void printMessageProperties(Message message) throws XMLStreamException { - xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT); - for (SimpleString key : message.getPropertyNames()) { - Object value = message.getObjectProperty(key); - xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD); - xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString()); - xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, XmlDataExporterUtil.convertProperty(value)); - - // Write the property type as an attribute - String propertyType = XmlDataExporterUtil.getPropertyType(value); - if (propertyType != null) { - xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, propertyType); - } - } - xmlWriter.writeEndElement(); // end PROPERTIES_PARENT - } - - private void printMessageAttributes(ICoreMessage message) throws XMLStreamException { - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID())); - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority())); - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration())); - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp())); - String prettyType = XmlDataExporterUtil.getMessagePrettyType(message.getType()); - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType); - if (message.getUserID() != null) { - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString()); - } - } - - private List extractQueueNames(HashMap refMap) { + private List extractQueueNames(HashMap refMap) { List queues = new ArrayList<>(); - for (ReferenceDescribe ref : refMap.values()) { + for (DescribeJournal.ReferenceDescribe ref : refMap.values()) { queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString()); } return queues; @@ -552,7 +459,7 @@ public final class XmlDataExporter extends DBOption { /** * Proxy to handle indenting the XML since javax.xml.stream.XMLStreamWriter doesn't support that. */ - static class PrettyPrintHandler implements InvocationHandler { + public static class PrettyPrintHandler implements InvocationHandler { private final XMLStreamWriter target; @@ -564,7 +471,7 @@ public final class XmlDataExporter extends DBOption { boolean wrap = true; - PrettyPrintHandler(XMLStreamWriter target) { + public PrettyPrintHandler(XMLStreamWriter target) { this.target = target; } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java index df48dcf471..7e6545cec9 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java @@ -88,7 +88,7 @@ public class XmlDataExporterUtil { /** * Base64 encode a ServerMessage body into the proper XML format */ - static String encodeMessageBody(final Message message) throws Exception { + static String encodeMessageBodyBase64(final Message message) throws Exception { Preconditions.checkNotNull(message, "ServerMessage can not be null"); ActiveMQBuffer byteBuffer = message.toCore().getReadOnlyBodyBuffer(); diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java index 595ed557b0..bec2fbdfad 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java @@ -19,25 +19,18 @@ package org.apache.activemq.artemis.cli.commands.tools.xml; import javax.xml.XMLConstants; import javax.xml.stream.XMLInputFactory; import javax.xml.stream.XMLStreamConstants; -import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamReader; import javax.xml.transform.stax.StAXSource; import javax.xml.validation.Schema; import javax.xml.validation.SchemaFactory; import javax.xml.validation.Validator; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.net.URL; import java.nio.ByteBuffer; import java.security.AccessController; import java.security.PrivilegedAction; -import java.util.ArrayList; import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; @@ -47,7 +40,6 @@ import java.util.TreeSet; import io.airlift.airline.Command; import io.airlift.airline.Option; -import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -68,10 +60,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.ClassloadingUtil; import org.apache.activemq.artemis.utils.ListUtil; -import org.apache.activemq.artemis.utils.UUIDGenerator; import org.jboss.logging.Logger; /** @@ -86,6 +76,8 @@ public final class XmlDataImporter extends ActionAbstract { private XMLStreamReader reader; + private XMLMessageImporter messageReader; + // this session is really only needed if the "session" variable does not auto-commit sends ClientSession managementSession; @@ -123,7 +115,7 @@ public final class XmlDataImporter extends ActionAbstract { @Option(name = "--legacy-prefixes", description = "Do not remove prefixes from legacy imports") public boolean legacyPrefixes = false; - TreeSet messages; + TreeSet messages; public String getPassword() { return password; @@ -179,6 +171,9 @@ public final class XmlDataImporter extends ActionAbstract { ClientSession session, ClientSession managementSession) throws Exception { reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream); + messageReader = new XMLMessageImporter(reader, session); + messageReader.setOldPrefixTranslation(oldPrefixTranslation); + this.session = session; if (managementSession != null) { this.managementSession = managementSession; @@ -237,9 +232,9 @@ public final class XmlDataImporter extends ActionAbstract { private void processXml() throws Exception { if (sort) { - messages = new TreeSet(new Comparator() { + messages = new TreeSet(new Comparator() { @Override - public int compare(MessageTemp o1, MessageTemp o2) { + public int compare(XMLMessageImporter.MessageInfo o1, XMLMessageImporter.MessageInfo o2) { if (o1.id == o2.id) { return 0; } else if (o1.id > o2.id) { @@ -270,8 +265,8 @@ public final class XmlDataImporter extends ActionAbstract { } if (sort) { - for (MessageTemp msgtmp : messages) { - sendMessage(msgtmp.queues, msgtmp.message, msgtmp.tempFileName); + for (XMLMessageImporter.MessageInfo msgtmp : messages) { + sendMessage(msgtmp.queues, msgtmp.message, msgtmp.tempFile); } } @@ -288,118 +283,14 @@ public final class XmlDataImporter extends ActionAbstract { } private void processMessage() throws Exception { - Byte type = 0; - Byte priority = 0; - Long expiration = 0L; - Long timestamp = 0L; - Long id = 0L; - org.apache.activemq.artemis.utils.UUID userId = null; - ArrayList queues = new ArrayList<>(); - - // get message's attributes - for (int i = 0; i < reader.getAttributeCount(); i++) { - String attributeName = reader.getAttributeLocalName(i); - switch (attributeName) { - case XmlDataConstants.MESSAGE_TYPE: - type = getMessageType(reader.getAttributeValue(i)); - break; - case XmlDataConstants.MESSAGE_PRIORITY: - priority = Byte.parseByte(reader.getAttributeValue(i)); - break; - case XmlDataConstants.MESSAGE_EXPIRATION: - expiration = Long.parseLong(reader.getAttributeValue(i)); - break; - case XmlDataConstants.MESSAGE_TIMESTAMP: - timestamp = Long.parseLong(reader.getAttributeValue(i)); - break; - case XmlDataConstants.MESSAGE_USER_ID: - userId = UUIDGenerator.getInstance().generateUUID(); - break; - case XmlDataConstants.MESSAGE_ID: - id = Long.parseLong(reader.getAttributeValue(i)); - break; - } - } - - Message message = session.createMessage(type, true, expiration, timestamp, priority); - message.setUserID(userId); - - boolean endLoop = false; - - File largeMessageTemporaryFile = null; - // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.) - while (reader.hasNext()) { - int eventType = reader.getEventType(); - switch (eventType) { - case XMLStreamConstants.START_ELEMENT: - if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) { - largeMessageTemporaryFile = processMessageBody(message.toCore()); - } else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) { - processMessageProperties(message); - } else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) { - processMessageQueues(queues); - } - break; - case XMLStreamConstants.END_ELEMENT: - if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) { - endLoop = true; - } - break; - } - if (endLoop) { - break; - } - reader.next(); - } - + XMLMessageImporter.MessageInfo info = messageReader.readMessage(false); if (sort) { - messages.add(new MessageTemp(id, queues, message, largeMessageTemporaryFile)); + messages.add(info); } else { - sendMessage(queues, message, largeMessageTemporaryFile); + sendMessage(info.queues, info.message, info.tempFile); } } - - class MessageTemp { - long id; - List queues; - Message message; - File tempFileName; - - MessageTemp(long id, List queues, Message message, File tempFileName) { - this.message = message; - this.queues = queues; - this.message = message; - this.id = id; - this.tempFileName = tempFileName; - } - } - - private Byte getMessageType(String value) { - Byte type = Message.DEFAULT_TYPE; - switch (value) { - case XmlDataConstants.DEFAULT_TYPE_PRETTY: - type = Message.DEFAULT_TYPE; - break; - case XmlDataConstants.BYTES_TYPE_PRETTY: - type = Message.BYTES_TYPE; - break; - case XmlDataConstants.MAP_TYPE_PRETTY: - type = Message.MAP_TYPE; - break; - case XmlDataConstants.OBJECT_TYPE_PRETTY: - type = Message.OBJECT_TYPE; - break; - case XmlDataConstants.STREAM_TYPE_PRETTY: - type = Message.STREAM_TYPE; - break; - case XmlDataConstants.TEXT_TYPE_PRETTY: - type = Message.TEXT_TYPE; - break; - } - return type; - } - private void sendMessage(List queues, Message message, File tempFileName) throws Exception { StringBuilder logMessage = new StringBuilder(); String destination = addressMap.get(queues.get(0)); @@ -460,153 +351,6 @@ public final class XmlDataImporter extends ActionAbstract { } } - private void processMessageQueues(ArrayList queues) { - for (int i = 0; i < reader.getAttributeCount(); i++) { - if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i))) { - String queueName = reader.getAttributeValue(i); - String translation = checkPrefix(queueName); - queues.add(translation); - } - } - } - - private String checkPrefix(String queueName) { - String newQueueName = oldPrefixTranslation.get(queueName); - if (newQueueName == null) { - newQueueName = queueName; - } - return newQueueName; - } - - private void processMessageProperties(Message message) { - String key = ""; - String value = ""; - String propertyType = ""; - - for (int i = 0; i < reader.getAttributeCount(); i++) { - String attributeName = reader.getAttributeLocalName(i); - switch (attributeName) { - case XmlDataConstants.PROPERTY_NAME: - key = reader.getAttributeValue(i); - break; - case XmlDataConstants.PROPERTY_VALUE: - value = reader.getAttributeValue(i); - break; - case XmlDataConstants.PROPERTY_TYPE: - propertyType = reader.getAttributeValue(i); - break; - } - } - - if (value.equals(XmlDataConstants.NULL)) { - value = null; - } - - switch (propertyType) { - case XmlDataConstants.PROPERTY_TYPE_SHORT: - message.putShortProperty(key, Short.parseShort(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_BOOLEAN: - message.putBooleanProperty(key, Boolean.parseBoolean(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_BYTE: - message.putByteProperty(key, Byte.parseByte(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_BYTES: - message.putBytesProperty(key, value == null ? null : decode(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_DOUBLE: - message.putDoubleProperty(key, Double.parseDouble(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_FLOAT: - message.putFloatProperty(key, Float.parseFloat(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_INTEGER: - message.putIntProperty(key, Integer.parseInt(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_LONG: - message.putLongProperty(key, Long.parseLong(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING: - message.putStringProperty(new SimpleString(key), value == null ? null : SimpleString.toSimpleString(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_STRING: - message.putStringProperty(key, value); - break; - } - } - - private File processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException { - File tempFileName = null; - boolean isLarge = false; - - for (int i = 0; i < reader.getAttributeCount(); i++) { - String attributeName = reader.getAttributeLocalName(i); - if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName)) { - isLarge = Boolean.parseBoolean(reader.getAttributeValue(i)); - } - } - reader.next(); - if (logger.isDebugEnabled()) { - logger.debug("XMLStreamReader impl: " + reader); - } - if (isLarge) { - tempFileName = File.createTempFile("largeMessage", ".tmp"); - if (logger.isDebugEnabled()) { - logger.debug("Creating temp file " + tempFileName + " for large message."); - } - try (OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFileName))) { - getMessageBodyBytes(new MessageBodyBytesProcessor() { - @Override - public void processBodyBytes(byte[] bytes) throws IOException { - out.write(bytes); - } - }); - } - FileInputStream fileInputStream = new FileInputStream(tempFileName); - BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream); - ((ClientMessage) message).setBodyInputStream(bufferedInput); - } else { - getMessageBodyBytes(new MessageBodyBytesProcessor() { - @Override - public void processBodyBytes(byte[] bytes) throws IOException { - message.getBodyBuffer().writeBytes(bytes); - } - }); - } - - return tempFileName; - } - - /** - * Message bodies are written to XML as one or more Base64 encoded CDATA elements. Some parser implementations won't - * read an entire CDATA element at once (e.g. Woodstox) so it's possible that multiple CDATA/CHARACTERS events need - * to be combined to reconstruct the Base64 encoded string. You can't decode bits and pieces of each CDATA. Each - * CDATA has to be decoded in its entirety. - * - * @param processor used to deal with the decoded CDATA elements - */ - private void getMessageBodyBytes(MessageBodyBytesProcessor processor) throws IOException, XMLStreamException { - int currentEventType; - StringBuilder cdata = new StringBuilder(); - while (reader.hasNext()) { - currentEventType = reader.getEventType(); - if (currentEventType == XMLStreamConstants.END_ELEMENT) { - break; - } else if (currentEventType == XMLStreamConstants.CHARACTERS && reader.isWhiteSpace() && cdata.length() > 0) { - /* when we hit a whitespace CHARACTERS event we know that the entire CDATA is complete so decode, pass back to - * the processor, and reset the cdata for the next event(s) - */ - processor.processBodyBytes(decode(cdata.toString())); - cdata.setLength(0); - } else { - cdata.append(new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()).trim()); - } - reader.next(); - } - } - - private void oldBinding() throws Exception { String queueName = ""; String address = ""; @@ -762,11 +506,5 @@ public final class XmlDataImporter extends ActionAbstract { } } - private static byte[] decode(String data) { - return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); - } - private interface MessageBodyBytesProcessor { - void processBodyBytes(byte[] bytes) throws IOException; - } } From 64ce26e7cc06c0e5779430a8126128d11da561bd Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Wed, 2 May 2018 11:35:17 +0100 Subject: [PATCH 2/2] ARTEMIS-1840 Added FQQN Import/Export Live Broker --- .../cli/commands/messages/Consumer.java | 57 ++- .../cli/commands/messages/ConsumerThread.java | 73 ++-- .../cli/commands/messages/DestAbstract.java | 96 +++++ .../cli/commands/messages/Producer.java | 127 ++++-- .../cli/commands/messages/ProducerThread.java | 11 + .../factory/serialize/MessageSerializer.java | 37 ++ .../serialize/XMLMessageSerializer.java | 118 ++++++ .../cli/test/MessageSerializerTest.java | 362 ++++++++++++++++++ 8 files changed, 826 insertions(+), 55 deletions(-) create mode 100644 artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/MessageSerializer.java create mode 100644 artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/XMLMessageSerializer.java create mode 100644 artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java index ee15a66a90..856e82b970 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java @@ -20,11 +20,16 @@ package org.apache.activemq.artemis.cli.commands.messages; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageListener; import javax.jms.Session; +import java.io.FileOutputStream; +import java.io.OutputStream; import io.airlift.airline.Command; import io.airlift.airline.Option; import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer; @Command(name = "consumer", description = "It will consume messages from an instance") public class Consumer extends DestAbstract { @@ -41,6 +46,9 @@ public class Consumer extends DestAbstract { @Option(name = "--filter", description = "filter to be used with the consumer") String filter; + @Option(name = "--data", description = "serialize the messages to the specified file as they are consumed") + String file; + @Override public Object execute(ActionContext context) throws Exception { super.execute(context); @@ -49,7 +57,34 @@ public class Consumer extends DestAbstract { ConnectionFactory factory = createConnectionFactory(); + SerialiserMessageListener listener = null; + MessageSerializer messageSerializer = null; + if (file != null) { + try { + String className = serializer == null ? DEFAULT_MESSAGE_SERIALIZER : serializer; + if (className.equals(DEFAULT_MESSAGE_SERIALIZER) && !protocol.equalsIgnoreCase("CORE")) { + System.err.println("Default Serializer does not support: " + protocol + " protocol"); + return null; + } + messageSerializer = (MessageSerializer) Class.forName(className).getConstructor().newInstance(); + } catch (Exception e) { + System.err.println("Error. Unable to instantiate serializer class: " + serializer); + return null; + } + + try { + OutputStream out = new FileOutputStream(file); + listener = new SerialiserMessageListener(messageSerializer, out); + } catch (Exception e) { + System.err.println("Error: Unable to open file for writing\n" + e.getMessage()); + return null; + } + } + + if (messageSerializer != null) messageSerializer.start(); + try (Connection connection = factory.createConnection()) { + // We read messages in a single thread when persisting to file. ConsumerThread[] threadsArray = new ConsumerThread[threads]; for (int i = 0; i < threads; i++) { Session session; @@ -58,10 +93,13 @@ public class Consumer extends DestAbstract { } else { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } - Destination dest = lookupDestination(session); + + // Do validation on FQQN + Destination dest = isFQQN() ? session.createQueue(getFQQNFromDestination(destination)) : lookupDestination(session); threadsArray[i] = new ConsumerThread(session, dest, i); - threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull).setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false); + threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull) + .setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false).setListener(listener); } for (ConsumerThread thread : threadsArray) { @@ -77,9 +115,24 @@ public class Consumer extends DestAbstract { received += thread.getReceived(); } + if (messageSerializer != null) messageSerializer.stop(); + return received; } } + private class SerialiserMessageListener implements MessageListener { + private MessageSerializer messageSerializer; + + SerialiserMessageListener(MessageSerializer messageSerializer, OutputStream outputStream) throws Exception { + this.messageSerializer = messageSerializer; + this.messageSerializer.setOutput(outputStream); + } + + @Override + public void onMessage(Message message) { + messageSerializer.write(message); + } + } } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java index ab3640bd62..9fbff81ec7 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java @@ -21,6 +21,7 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.QueueBrowser; @@ -50,6 +51,7 @@ public class ConsumerThread extends Thread { boolean running = false; CountDownLatch finished; boolean bytesAsText; + MessageListener listener; public ConsumerThread(Session session, Destination destination, int threadNr) { super("Consumer " + destination.toString() + ", thread=" + threadNr); @@ -66,6 +68,43 @@ public class ConsumerThread extends Thread { } } + private void handle(Message msg, boolean browse) throws JMSException { + if (listener != null) { + listener.onMessage(msg); + } else { + if (browse) { + if (verbose) { + System.out.println("..." + msg); + } + if (bytesAsText && (msg instanceof BytesMessage)) { + long length = ((BytesMessage) msg).getBodyLength(); + byte[] bytes = new byte[(int) length]; + ((BytesMessage) msg).readBytes(bytes); + System.out.println("Message:" + msg); + } + } else { + if (verbose) { + if (bytesAsText && (msg instanceof BytesMessage)) { + long length = ((BytesMessage) msg).getBodyLength(); + byte[] bytes = new byte[(int) length]; + ((BytesMessage) msg).readBytes(bytes); + System.out.println("Received a message with " + bytes.length); + } + + if (msg instanceof TextMessage) { + String text = ((TextMessage) msg).getText(); + System.out.println("Received text sized at " + text.length()); + } + + if (msg instanceof ObjectMessage) { + Object obj = ((ObjectMessage) msg).getObject(); + System.out.println("Received object " + obj.toString().length()); + } + } + } + } + } + public void browse() { running = true; QueueBrowser consumer = null; @@ -83,16 +122,7 @@ public class ConsumerThread extends Thread { Message msg = enumBrowse.nextElement(); if (msg != null) { System.out.println(threadName + " browsing " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID())); - - if (verbose) { - System.out.println("..." + msg); - } - if (bytesAsText && (msg instanceof BytesMessage)) { - long length = ((BytesMessage) msg).getBodyLength(); - byte[] bytes = new byte[(int) length]; - ((BytesMessage) msg).readBytes(bytes); - System.out.println("Message:" + msg); - } + handle(msg, true); received++; if (received >= messageCount) { @@ -158,24 +188,7 @@ public class ConsumerThread extends Thread { System.out.println("Received " + count); } } - if (verbose) { - if (bytesAsText && (msg instanceof BytesMessage)) { - long length = ((BytesMessage) msg).getBodyLength(); - byte[] bytes = new byte[(int) length]; - ((BytesMessage) msg).readBytes(bytes); - System.out.println("Received a message with " + bytes.length); - } - - if (msg instanceof TextMessage) { - String text = ((TextMessage) msg).getText(); - System.out.println("Received text sized at " + text.length()); - } - - if (msg instanceof ObjectMessage) { - Object obj = ((ObjectMessage) msg).getObject(); - System.out.println("Received object " + obj.toString().length()); - } - } + handle(msg, false); received++; } else { if (breakOnNull) { @@ -334,4 +347,8 @@ public class ConsumerThread extends Thread { this.browse = browse; return this; } + + public void setListener(MessageListener listener) { + this.listener = listener; + } } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java index 2f4a34c72e..63b5f17913 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java @@ -19,12 +19,30 @@ package org.apache.activemq.artemis.cli.commands.messages; import javax.jms.Destination; import javax.jms.Session; +import java.nio.ByteBuffer; import io.airlift.airline.Option; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientRequestor; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer; +import org.apache.activemq.artemis.cli.factory.serialize.XMLMessageSerializer; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; public class DestAbstract extends ConnectionAbstract { + public static final String DEFAULT_MESSAGE_SERIALIZER = "org.apache.activemq.artemis.cli.factory.serialize.XMLMessageSerializer"; + + private static final String FQQN_PREFIX = "fqqn://"; + + private static final String FQQN_SEPERATOR = "::"; + @Option(name = "--destination", description = "Destination to be used. It can be prefixed with queue:// or topic:// (Default: queue://TEST)") String destination = "queue://TEST"; @@ -40,6 +58,25 @@ public class DestAbstract extends ConnectionAbstract { @Option(name = "--threads", description = "Number of Threads to be used (Default: 1)") int threads = 1; + @Option(name = "--serializer", description = "Override the default serializer with a custom implementation") + String serializer; + + protected boolean isFQQN() throws ActiveMQException { + boolean fqqn = destination.contains("::"); + if (fqqn) { + if (!destination.startsWith("fqqn://")) { + throw new ActiveMQException("FQQN destinations must start with the fqqn:// prefix"); + } + + if (protocol.equalsIgnoreCase("AMQP")) { + throw new ActiveMQException("Sending to FQQN destinations is not support via AMQP protocol"); + } + return true; + } else { + return false; + } + } + protected Destination lookupDestination(Session session) throws Exception { if (protocol.equals("AMQP")) { return session.createQueue(destination); @@ -48,4 +85,63 @@ public class DestAbstract extends ConnectionAbstract { } } + protected MessageSerializer getMessageSerializer() { + if (serializer == null) return new XMLMessageSerializer(); + try { + return (MessageSerializer) Class.forName(serializer).getConstructor().newInstance(); + } catch (Exception e) { + System.out.println("Error: unable to instantiate serializer class: " + serializer); + System.out.println("Defaulting to: " + DEFAULT_MESSAGE_SERIALIZER); + } + return new XMLMessageSerializer(); + } + + // FIXME We currently do not support producing to FQQN. This is a work around. + private ClientSession getManagementSession() throws Exception { + ServerLocator serverLocator = ActiveMQClient.createServerLocator(brokerURL); + ClientSessionFactory sf = serverLocator.createSessionFactory(); + + ClientSession managementSession; + if (user != null || password != null) { + managementSession = sf.createSession(user, password, false, true, true, false, 0); + } else { + managementSession = sf.createSession(false, true, true); + } + return managementSession; + } + + public byte[] getQueueIdFromName(String queueName) throws Exception { + ClientMessage message = getQueueAttribute(queueName, "ID"); + Number idObject = (Number) ManagementHelper.getResult(message); + ByteBuffer byteBuffer = ByteBuffer.allocate(8); + byteBuffer.putLong(idObject.longValue()); + return byteBuffer.array(); + } + + protected ClientMessage getQueueAttribute(String queueName, String attribute) throws Exception { + ClientSession managementSession = getManagementSession(); + managementSession.start(); + + try (ClientRequestor requestor = new ClientRequestor(managementSession, "activemq.management")) { + ClientMessage managementMessage = managementSession.createMessage(false); + ManagementHelper.putAttribute(managementMessage, ResourceNames.QUEUE + queueName, attribute); + managementSession.start(); + ClientMessage reply = requestor.request(managementMessage); + return reply; + } finally { + managementSession.stop(); + } + } + + protected String getQueueFromFQQN(String fqqn) { + return fqqn.substring(fqqn.indexOf(FQQN_SEPERATOR) + FQQN_SEPERATOR.length()); + } + + protected String getAddressFromFQQN(String fqqn) { + return fqqn.substring(fqqn.indexOf(FQQN_PREFIX) + FQQN_PREFIX.length(), fqqn.indexOf(FQQN_SEPERATOR)); + } + + protected String getFQQNFromDestination(String destination) { + return destination.substring(destination.indexOf(FQQN_PREFIX) + FQQN_PREFIX.length()); + } } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java index 3cb5effb04..0936578b1f 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java @@ -19,12 +19,21 @@ package org.apache.activemq.artemis.cli.commands.messages; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageProducer; import javax.jms.Session; +import java.io.FileInputStream; import io.airlift.airline.Command; import io.airlift.airline.Option; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer; +import org.apache.activemq.artemis.jms.client.ActiveMQMessage; @Command(name = "producer", description = "It will send messages to an instance") public class Producer extends DestAbstract { @@ -49,6 +58,9 @@ public class Producer extends DestAbstract { @Option(name = "--group", description = "Message Group to be used") String msgGroupID = null; + @Option(name = "--data", description = "Messages will be read form the specified file, other message options will be ignored.") + String fileName = null; + @Override public Object execute(ActionContext context) throws Exception { super.execute(context); @@ -56,35 +68,100 @@ public class Producer extends DestAbstract { ConnectionFactory factory = createConnectionFactory(); try (Connection connection = factory.createConnection()) { - ProducerThread[] threadsArray = new ProducerThread[threads]; - for (int i = 0; i < threads; i++) { - Session session; - if (txBatchSize > 0) { - session = connection.createSession(true, Session.SESSION_TRANSACTED); - } else { - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + byte[] queueId = null; + boolean isFQQN = isFQQN(); + if (isFQQN) { + queueId = getQueueIdFromName(getQueueFromFQQN(destination)); + } + + // If we are reading from file, we process messages sequentially to guarantee ordering. i.e. no thread creation. + if (fileName != null) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination dest = lookupDestination(session, isFQQN); + + MessageProducer producer = session.createProducer(dest); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + int messageCount = 0; + try { + MessageSerializer serializer = getMessageSerializer(); + serializer.setInput(new FileInputStream(fileName), session); + serializer.start(); + + Message message = serializer.read(); + + while (message != null) { + if (queueId != null) ((ActiveMQMessage) message).getCoreMessage().putBytesProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS, queueId); + producer.send(message); + message = serializer.read(); + messageCount++; + } + + session.commit(); + serializer.stop(); + } catch (Exception e) { + System.err.println("Error occurred during import. Rolling back."); + session.rollback(); + e.printStackTrace(); + return 0; } - Destination dest = lookupDestination(session); - threadsArray[i] = new ProducerThread(session, dest, i); + System.out.println("Sent " + messageCount + " Messages."); + return messageCount; + } else { + ProducerThread[] threadsArray = new ProducerThread[threads]; + for (int i = 0; i < threads; i++) { + Session session; + if (txBatchSize > 0) { + session = connection.createSession(true, Session.SESSION_TRANSACTED); + } else { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + Destination dest = lookupDestination(session, isFQQN); + threadsArray[i] = new ProducerThread(session, dest, i); - threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent). - setMessageSize(messageSize).setTextMessageSize(textMessageSize).setObjectSize(objectSize). - setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize). - setMessageCount(messageCount); + threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent). + setMessageSize(messageSize).setTextMessageSize(textMessageSize).setObjectSize(objectSize). + setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize). + setMessageCount(messageCount).setQueueId(queueId); + } + + for (ProducerThread thread : threadsArray) { + thread.start(); + } + + int messagesProduced = 0; + for (ProducerThread thread : threadsArray) { + thread.join(); + messagesProduced += thread.getSentCount(); + } + return messagesProduced; } - - for (ProducerThread thread : threadsArray) { - thread.start(); - } - - int messagesProduced = 0; - for (ProducerThread thread : threadsArray) { - thread.join(); - messagesProduced += thread.getSentCount(); - } - - return messagesProduced; } } + public Destination lookupDestination(Session session, boolean isFQQN) throws Exception { + Destination dest; + if (!isFQQN) { + dest = lookupDestination(session); + } else { + String address = getAddressFromFQQN(destination); + if (isFQQNAnycast(getQueueFromFQQN(destination))) { + String queue = getQueueFromFQQN(destination); + if (!queue.equals(address)) { + throw new ActiveMQException("FQQN support is limited to Anycast queues where the queue name equals the address."); + } + dest = session.createQueue(address); + } else { + dest = session.createTopic(address); + } + } + return dest; + } + + protected boolean isFQQNAnycast(String queueName) throws Exception { + ClientMessage message = getQueueAttribute(queueName, "RoutingType"); + String routingType = (String) ManagementHelper.getResult(message); + return routingType.equalsIgnoreCase("anycast"); + } } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java index 6e9fc5c4d6..58a57ef0e8 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ProducerThread.java @@ -30,6 +30,7 @@ import java.io.InputStreamReader; import java.net.URL; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.jms.client.ActiveMQMessage; import org.apache.activemq.artemis.utils.ReusableLatch; public class ProducerThread extends Thread { @@ -48,6 +49,7 @@ public class ProducerThread extends Thread { long msgTTL = 0L; String msgGroupID = null; int transactionBatchSize; + byte[] queueId = null; int transactions = 0; final AtomicInteger sentCount = new AtomicInteger(0); @@ -121,6 +123,11 @@ public class ProducerThread extends Thread { private void sendMessage(MessageProducer producer, String threadName) throws Exception { Message message = createMessage(sentCount.get(), threadName); + + if (queueId != null) { + ((ActiveMQMessage) message).getCoreMessage().putBytesProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS, queueId); + } + producer.send(message); if (verbose) { System.out.println(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID())); @@ -370,4 +377,8 @@ public class ProducerThread extends Thread { this.objectSize = objectSize; return this; } + + public void setQueueId(byte[] queueId) { + this.queueId = queueId; + } } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/MessageSerializer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/MessageSerializer.java new file mode 100644 index 0000000000..920790880e --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/MessageSerializer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.cli.factory.serialize; + +import javax.jms.Message; +import javax.jms.Session; +import java.io.InputStream; +import java.io.OutputStream; + +public interface MessageSerializer { + + Message read() throws Exception; + + void write(Message message); + + void setOutput(OutputStream out) throws Exception; + + void setInput(InputStream in, Session session) throws Exception; + + void start() throws Exception; + + void stop() throws Exception; +} diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/XMLMessageSerializer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/XMLMessageSerializer.java new file mode 100644 index 0000000000..e353b94240 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/factory/serialize/XMLMessageSerializer.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.cli.factory.serialize; + +import javax.jms.Message; +import javax.jms.Session; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Proxy; + +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.cli.commands.tools.xml.XMLMessageExporter; +import org.apache.activemq.artemis.cli.commands.tools.xml.XMLMessageImporter; +import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataConstants; +import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataExporter; +import org.apache.activemq.artemis.jms.client.ActiveMQMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQSession; + +public class XMLMessageSerializer implements MessageSerializer { + + private XMLMessageExporter writer; + + private XMLMessageImporter reader; + + private ClientSession clientSession; + + private OutputStream out; + + @Override + public synchronized Message read() throws Exception { + reader.getRawXMLReader().nextTag(); + + // End of document. + if (reader.getRawXMLReader().getLocalName().equals("messages")) return null; + + XMLMessageImporter.MessageInfo messageInfo = reader.readMessage(true); + if (messageInfo == null) return null; + + // This is a large message + ActiveMQMessage jmsMessage = new ActiveMQMessage((ClientMessage) messageInfo.message, clientSession); + if (messageInfo.tempFile != null) { + jmsMessage.setInputStream(new FileInputStream(messageInfo.tempFile)); + } + return jmsMessage; + } + + @Override + public synchronized void write(Message message) { + try { + ICoreMessage core = ((ActiveMQMessage) message).getCoreMessage(); + writer.printSingleMessageAsXML(core, null, true); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void setOutput(OutputStream outputStream) throws Exception { + this.out = outputStream; + XMLOutputFactory factory = XMLOutputFactory.newInstance(); + XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(outputStream, "UTF-8"); + XmlDataExporter.PrettyPrintHandler handler = new XmlDataExporter.PrettyPrintHandler(rawXmlWriter); + XMLStreamWriter xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler); + this.writer = new XMLMessageExporter(xmlWriter); + } + + @Override + public void setInput(InputStream inputStream, Session session) throws Exception { + XMLStreamReader streamReader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream); + this.clientSession = ((ActiveMQSession) session).getCoreSession(); + this.reader = new XMLMessageImporter(streamReader, clientSession); + } + + @Override + public synchronized void start() throws Exception { + if (writer != null) { + writer.getRawXMLWriter().writeStartDocument(XmlDataConstants.XML_VERSION); + writer.getRawXMLWriter().writeStartElement(XmlDataConstants.MESSAGES_PARENT); + } + + if (reader != null) { + // + reader.getRawXMLReader().nextTag(); + } + } + + @Override + public synchronized void stop() throws Exception { + if (writer != null) { + writer.getRawXMLWriter().writeEndElement(); + writer.getRawXMLWriter().writeEndDocument(); + writer.getRawXMLWriter().flush(); + writer.getRawXMLWriter().close(); + out.flush(); + } + } +} diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java new file mode 100644 index 0000000000..df791043a9 --- /dev/null +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.cli.test; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.cli.Artemis; +import org.apache.activemq.artemis.cli.commands.Run; +import org.apache.activemq.artemis.jlibaio.LibaioContext; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Test to validate that the CLI doesn't throw improper exceptions when invoked. + */ +public class MessageSerializerTest extends CliTestBase { + + private Connection connection; + + @Before + @Override + public void setup() throws Exception { + setupAuth(); + super.setup(); + startServer(); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); + connection = cf.createConnection("admin", "admin"); + } + + @After + @Override + public void tearDown() throws Exception { + try { + connection.close(); + } finally { + stopServer(); + super.tearDown(); + } + } + + private void setupAuth() throws Exception { + setupAuth(temporaryFolder.getRoot()); + } + + private void setupAuth(File folder) throws Exception { + System.setProperty("java.security.auth.login.config", folder.getAbsolutePath() + "/etc/login.config"); + } + + private void startServer() throws Exception { + File rootDirectory = new File(temporaryFolder.getRoot(), "broker"); + setupAuth(rootDirectory); + Run.setEmbedded(true); + Artemis.main("create", rootDirectory.getAbsolutePath(), "--silent", "--no-fsync", "--no-autotune", "--no-web", "--require-login"); + System.setProperty("artemis.instance", rootDirectory.getAbsolutePath()); + Artemis.internalExecute("run"); + } + + private void stopServer() throws Exception { + Artemis.internalExecute("stop"); + assertTrue(Run.latchRunning.await(5, TimeUnit.SECONDS)); + assertEquals(0, LibaioContext.getTotalMaxIO()); + } + + private File createMessageFile() throws IOException { + return temporaryFolder.newFile("messages.xml"); + } + + @Test + public void testTextMessageImportExport() throws Exception { + String address = "test"; + int noMessages = 10; + File file = createMessageFile(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + List sent = new ArrayList<>(noMessages); + for (int i = 0; i < noMessages; i++) { + sent.add(session.createTextMessage(RandomUtil.randomString())); + } + + sendMessages(session, address, sent); + exportMessages(address, noMessages, file); + + // Ensure there's nothing left to consume + MessageConsumer consumer = session.createConsumer(getDestination(address)); + assertNull(consumer.receive(1000)); + consumer.close(); + + importMessages(address, file); + + List received = consumeMessages(session, address, noMessages, false); + for (int i = 0; i < noMessages; i++) { + assertEquals(((TextMessage) sent.get(i)).getText(), ((TextMessage) received.get(i)).getText()); + } + } + + @Test + public void testObjectMessageImportExport() throws Exception { + String address = "test"; + int noMessages = 10; + File file = createMessageFile(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + // Send initial messages. + List sent = new ArrayList<>(noMessages); + for (int i = 0; i < noMessages; i++) { + sent.add(session.createObjectMessage(UUID.randomUUID())); + } + + sendMessages(session, address, sent); + exportMessages(address, noMessages, file); + + // Ensure there's nothing left to consume + MessageConsumer consumer = session.createConsumer(getDestination(address)); + assertNull(consumer.receive(1000)); + consumer.close(); + + importMessages(address, file); + List received = consumeMessages(session, address, noMessages, false); + for (int i = 0; i < noMessages; i++) { + assertEquals(((ObjectMessage) sent.get(i)).getObject(), ((ObjectMessage) received.get(i)).getObject()); + } + } + + @Test + public void testMapMessageImportExport() throws Exception { + String address = "test"; + int noMessages = 10; + String key = "testKey"; + File file = createMessageFile(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + List sent = new ArrayList<>(noMessages); + for (int i = 0; i < noMessages; i++) { + MapMessage m = session.createMapMessage(); + m.setString(key, RandomUtil.randomString()); + sent.add(m); + } + + sendMessages(session, address, sent); + exportMessages(address, noMessages, file); + + // Ensure there's nothing left to consume + MessageConsumer consumer = session.createConsumer(getDestination(address)); + assertNull(consumer.receive(1000)); + consumer.close(); + + importMessages(address, file); + List received = consumeMessages(session, address, noMessages, false); + for (int i = 0; i < noMessages; i++) { + assertEquals(((MapMessage) sent.get(i)).getString(key), ((MapMessage) received.get(i)).getString(key)); + } + } + + private void sendMessages(Session session, String address, List messages) throws Exception { + MessageProducer producer = session.createProducer(getDestination(address)); + for (Message m : messages) { + producer.send(m); + } + } + + private void sendMessages(Session session, Destination destination, List messages) throws Exception { + MessageProducer producer = session.createProducer(destination); + for (Message m : messages) { + producer.send(m); + } + } + + private List consumeMessages(Session session, String address, int noMessages, boolean fqqn) throws Exception { + Destination destination = fqqn ? session.createQueue(address) : getDestination(address); + MessageConsumer consumer = session.createConsumer(destination); + + List messages = new ArrayList<>(); + for (int i = 0; i < noMessages; i++) { + Message m = consumer.receive(1000); + assertNotNull(m); + messages.add(m); + } + return messages; + } + + private void exportMessages(String address, int noMessages, File output) throws Exception { + Artemis.main("consumer", + "--user", "admin", + "--password", "admin", + "--destination", address, + "--message-count", "" + noMessages, + "--data", output.getAbsolutePath()); + } + + private void importMessages(String address, File input) throws Exception { + Artemis.main("producer", + "--user", "admin", + "--password", "admin", + "--destination", address, + "--data", input.getAbsolutePath()); + } + + private void createQueue(String routingTypeOption, String address, String queueName) throws Exception { + Artemis.main("queue", "create", + "--user", "admin", + "--password", "admin", + "--address", address, + "--name", queueName, + routingTypeOption, + "--durable", + "--preserve-on-no-consumers", + "--auto-create-address"); + } + + @Test + public void testSendDirectToQueue() throws Exception { + + String address = "test"; + String queue1Name = "queue1"; + String queue2Name = "queue2"; + + createQueue("--multicast", address, queue1Name); + createQueue("--multicast", address, queue2Name); + + try (ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = cf.createConnection("admin", "admin");) { + + // send messages to queue + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + Destination queue1 = session.createQueue(address + "::" + queue1Name); + Destination queue2 = session.createQueue(address + "::" + queue2Name); + + MessageConsumer consumer1 = session.createConsumer(queue1); + MessageConsumer consumer2 = session.createConsumer(queue2); + + Artemis.main("producer", + "--user", "admin", + "--password", "admin", + "--destination", "fqqn://" + address + "::" + queue1Name, + "--message-count", "5"); + + assertNull(consumer2.receive(1000)); + assertNotNull(consumer1.receive(1000)); + } + } + + @Test + public void exportFromFQQN() throws Exception { + String addr = "address"; + String queue = "queue"; + String fqqn = addr + "::" + queue; + String destination = "fqqn://" + fqqn; + + File file = createMessageFile(); + int noMessages = 10; + + createQueue("--multicast", addr, queue); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + Topic topic = session.createTopic(addr); + + List messages = new ArrayList<>(noMessages); + for (int i = 0; i < noMessages; i++) { + messages.add(session.createTextMessage(RandomUtil.randomString())); + } + + sendMessages(session, topic, messages); + + exportMessages(destination, noMessages, file); + importMessages(destination, file); + + List recieved = consumeMessages(session, fqqn, noMessages, true); + for (int i = 0; i < noMessages; i++) { + assertEquals(((TextMessage) messages.get(i)).getText(), ((TextMessage) recieved.get(i)).getText()); + } + } + + //read individual lines from byteStream + private ArrayList getOutputLines(TestActionContext context, boolean errorOutput) throws IOException { + byte[] bytes; + + if (errorOutput) { + bytes = context.getStdErrBytes(); + } else { + bytes = context.getStdoutBytes(); + } + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(bytes))); + ArrayList lines = new ArrayList<>(); + + String currentLine = bufferedReader.readLine(); + while (currentLine != null) { + lines.add(currentLine); + currentLine = bufferedReader.readLine(); + } + + return lines; + } + + private void sendMessages(Session session, String queueName, int messageCount) throws JMSException { + MessageProducer producer = session.createProducer(getDestination(queueName)); + + TextMessage message = session.createTextMessage(getTestMessageBody()); + + for (int i = 0; i < messageCount; i++) { + producer.send(message); + } + } + + private String getTestMessageBody() { + return "Sample Message"; + } + + private Destination getDestination(String queueName) { + return ActiveMQDestination.createDestination("queue://" + queueName, ActiveMQDestination.TYPE.QUEUE); + } +}