From 812776fca7ab7e029bdc5b46da87325d597fd723 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Wed, 2 May 2018 11:32:39 +0100 Subject: [PATCH] ARTEMIS-1840 Refactor XML Data Serialiser --- .../tools/xml/XMLMessageExporter.java | 148 ++++++++ .../tools/xml/XMLMessageImporter.java | 331 ++++++++++++++++++ .../commands/tools/xml/XmlDataConstants.java | 4 +- .../commands/tools/xml/XmlDataExporter.java | 117 +------ .../tools/xml/XmlDataExporterUtil.java | 2 +- .../commands/tools/xml/XmlDataImporter.java | 288 +-------------- 6 files changed, 507 insertions(+), 383 deletions(-) create mode 100644 artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java create mode 100644 artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageImporter.java diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java new file mode 100644 index 0000000000..a2fbaddef8 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.cli.commands.tools.xml; + +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamWriter; +import java.util.List; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.reader.TextMessageUtil; + + +/** This is an Utility class that will import the outputs in XML format. */ +public class XMLMessageExporter { + + private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L; + + private XMLStreamWriter xmlWriter; + + public XMLMessageExporter(XMLStreamWriter xmlWriter) { + this.xmlWriter = xmlWriter; + } + + public XMLStreamWriter getRawXMLWriter() { + return xmlWriter; + } + + public void printSingleMessageAsXML(ICoreMessage message, List queues, boolean encodeTextUTF8) throws Exception { + xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD); + printMessageAttributes(message); + printMessageProperties(message); + printMessageQueues(queues); + printMessageBody(message.toCore(), encodeTextUTF8); + xmlWriter.writeEndElement(); // end MESSAGES_CHILD + } + + public void printMessageBody(Message message, boolean encodeTextMessageUTF8) throws Exception { + xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY); + + if (message.isLargeMessage()) { + printLargeMessageBody((LargeServerMessage) message); + } else { + if (encodeTextMessageUTF8 && message.toCore().getType() == Message.TEXT_TYPE) { + xmlWriter.writeCData(TextMessageUtil.readBodyText(message.toCore().getReadOnlyBodyBuffer()).toString()); + } else { + xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBodyBase64(message)); + } + } + xmlWriter.writeEndElement(); // end MESSAGE_BODY + } + + public void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException { + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString()); + LargeBodyEncoder encoder = null; + + try { + encoder = message.toCore().getBodyEncoder(); + encoder.open(); + long totalBytesWritten = 0; + Long bufferSize; + long bodySize = encoder.getLargeBodySize(); + for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) { + Long remainder = bodySize - totalBytesWritten; + if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) { + bufferSize = LARGE_MESSAGE_CHUNK_SIZE; + } else { + bufferSize = remainder; + } + ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue()); + encoder.encode(buffer, bufferSize.intValue()); + xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array())); + totalBytesWritten += bufferSize; + } + encoder.close(); + } catch (ActiveMQException e) { + e.printStackTrace(); + } finally { + if (encoder != null) { + try { + encoder.close(); + } catch (ActiveMQException e) { + e.printStackTrace(); + } + } + } + } + + public void printMessageQueues(List queues) throws XMLStreamException { + if (queues != null) { + xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT); + for (String queueName : queues) { + xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD); + xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName); + } + xmlWriter.writeEndElement(); // end QUEUES_PARENT + } + } + + public void printMessageProperties(Message message) throws XMLStreamException { + xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT); + for (SimpleString key : message.getPropertyNames()) { + Object value = message.getObjectProperty(key); + xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD); + xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString()); + xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, XmlDataExporterUtil.convertProperty(value)); + + // Write the property type as an attribute + String propertyType = XmlDataExporterUtil.getPropertyType(value); + if (propertyType != null) { + xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, propertyType); + } + } + xmlWriter.writeEndElement(); // end PROPERTIES_PARENT + } + + public void printMessageAttributes(ICoreMessage message) throws XMLStreamException { + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID())); + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority())); + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration())); + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp())); + String prettyType = XmlDataExporterUtil.getMessagePrettyType(message.getType()); + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType); + if (message.getUserID() != null) { + xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString()); + } + } +} diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageImporter.java new file mode 100644 index 0000000000..09e78d5657 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageImporter.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.cli.commands.tools.xml; + +import javax.xml.stream.XMLStreamConstants; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.utils.Base64; +import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.jboss.logging.Logger; + +/** This is an Utility class that will import the outputs in XML format. */ +public class XMLMessageImporter { + + private static final Logger logger = Logger.getLogger(XMLMessageImporter.class); + + private XMLStreamReader reader; + + private ClientSession session; + + Map oldPrefixTranslation = new HashMap<>(); + + public XMLMessageImporter(XMLStreamReader xmlStreamReader, ClientSession session) { + this.reader = xmlStreamReader; + this.session = session; + } + + public void setOldPrefixTranslation(Map oldPrefixTranslation) { + this.oldPrefixTranslation = oldPrefixTranslation; + } + + public XMLStreamReader getRawXMLReader() { + return reader; + } + + public MessageInfo readMessage(boolean decodeUTF8) throws Exception { + if (!reader.hasNext()) return null; + + Byte type = 0; + Byte priority = 0; + Long expiration = 0L; + Long timestamp = 0L; + Long id = 0L; + org.apache.activemq.artemis.utils.UUID userId = null; + ArrayList queues = new ArrayList<>(); + + // get message's attributes + for (int i = 0; i < reader.getAttributeCount(); i++) { + String attributeName = reader.getAttributeLocalName(i); + switch (attributeName) { + case XmlDataConstants.MESSAGE_TYPE: + type = getMessageType(reader.getAttributeValue(i)); + break; + case XmlDataConstants.MESSAGE_PRIORITY: + priority = Byte.parseByte(reader.getAttributeValue(i)); + break; + case XmlDataConstants.MESSAGE_EXPIRATION: + expiration = Long.parseLong(reader.getAttributeValue(i)); + break; + case XmlDataConstants.MESSAGE_TIMESTAMP: + timestamp = Long.parseLong(reader.getAttributeValue(i)); + break; + case XmlDataConstants.MESSAGE_USER_ID: + userId = UUIDGenerator.getInstance().generateUUID(); + break; + case XmlDataConstants.MESSAGE_ID: + id = Long.parseLong(reader.getAttributeValue(i)); + break; + } + } + + Message message = session.createMessage(type, true, expiration, timestamp, priority); + + message.setUserID(userId); + + boolean endLoop = false; + + File largeMessageTemporaryFile = null; + // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.) + while (reader.hasNext()) { + int eventType = reader.getEventType(); + switch (eventType) { + case XMLStreamConstants.START_ELEMENT: + if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) { + largeMessageTemporaryFile = processMessageBody(message.toCore(), decodeUTF8); + } else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) { + processMessageProperties(message); + } else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) { + processMessageQueues(queues); + } + break; + case XMLStreamConstants.END_ELEMENT: + if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) { + endLoop = true; + } + break; + } + if (endLoop) { + break; + } + reader.next(); + } + return new MessageInfo(id, queues, message, largeMessageTemporaryFile); + } + + private Byte getMessageType(String value) { + Byte type = Message.DEFAULT_TYPE; + switch (value) { + case XmlDataConstants.DEFAULT_TYPE_PRETTY: + type = Message.DEFAULT_TYPE; + break; + case XmlDataConstants.BYTES_TYPE_PRETTY: + type = Message.BYTES_TYPE; + break; + case XmlDataConstants.MAP_TYPE_PRETTY: + type = Message.MAP_TYPE; + break; + case XmlDataConstants.OBJECT_TYPE_PRETTY: + type = Message.OBJECT_TYPE; + break; + case XmlDataConstants.STREAM_TYPE_PRETTY: + type = Message.STREAM_TYPE; + break; + case XmlDataConstants.TEXT_TYPE_PRETTY: + type = Message.TEXT_TYPE; + break; + } + return type; + } + + private void processMessageQueues(ArrayList queues) { + for (int i = 0; i < reader.getAttributeCount(); i++) { + if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i))) { + String queueName = reader.getAttributeValue(i); + String translation = checkPrefix(queueName); + queues.add(translation); + } + } + } + + private String checkPrefix(String queueName) { + String newQueueName = oldPrefixTranslation.get(queueName); + if (newQueueName == null) { + newQueueName = queueName; + } + return newQueueName; + } + + private void processMessageProperties(Message message) { + String key = ""; + String value = ""; + String propertyType = ""; + + for (int i = 0; i < reader.getAttributeCount(); i++) { + String attributeName = reader.getAttributeLocalName(i); + switch (attributeName) { + case XmlDataConstants.PROPERTY_NAME: + key = reader.getAttributeValue(i); + break; + case XmlDataConstants.PROPERTY_VALUE: + value = reader.getAttributeValue(i); + break; + case XmlDataConstants.PROPERTY_TYPE: + propertyType = reader.getAttributeValue(i); + break; + } + } + + if (value.equals(XmlDataConstants.NULL)) { + value = null; + } + + switch (propertyType) { + case XmlDataConstants.PROPERTY_TYPE_SHORT: + message.putShortProperty(key, Short.parseShort(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_BOOLEAN: + message.putBooleanProperty(key, Boolean.parseBoolean(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_BYTE: + message.putByteProperty(key, Byte.parseByte(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_BYTES: + message.putBytesProperty(key, value == null ? null : decode(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_DOUBLE: + message.putDoubleProperty(key, Double.parseDouble(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_FLOAT: + message.putFloatProperty(key, Float.parseFloat(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_INTEGER: + message.putIntProperty(key, Integer.parseInt(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_LONG: + message.putLongProperty(key, Long.parseLong(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING: + message.putStringProperty(new SimpleString(key), value == null ? null : SimpleString.toSimpleString(value)); + break; + case XmlDataConstants.PROPERTY_TYPE_STRING: + message.putStringProperty(key, value); + break; + } + } + + private File processMessageBody(final ICoreMessage message, boolean decodeTextMessage) throws XMLStreamException, IOException { + File tempFileName = null; + boolean isLarge = false; + + for (int i = 0; i < reader.getAttributeCount(); i++) { + String attributeName = reader.getAttributeLocalName(i); + if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName)) { + isLarge = Boolean.parseBoolean(reader.getAttributeValue(i)); + } + } + reader.next(); + if (logger.isDebugEnabled()) { + logger.debug("XMLStreamReader impl: " + reader); + } + if (isLarge) { + tempFileName = File.createTempFile("largeMessage", ".tmp"); + if (logger.isDebugEnabled()) { + logger.debug("Creating temp file " + tempFileName + " for large message."); + } + try (OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFileName))) { + getMessageBodyBytes(bytes -> out.write(bytes), (message.toCore().getType() == Message.TEXT_TYPE) && decodeTextMessage); + } + FileInputStream fileInputStream = new FileInputStream(tempFileName); + BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream); + ((ClientMessage) message).setBodyInputStream(bufferedInput); + } else { + getMessageBodyBytes(bytes -> message.getBodyBuffer().writeBytes(bytes), (message.toCore().getType() == Message.TEXT_TYPE) && decodeTextMessage); + } + + return tempFileName; + } + + /** + * Message bodies are written to XML as one or more Base64 encoded CDATA elements. Some parser implementations won't + * read an entire CDATA element at once (e.g. Woodstox) so it's possible that multiple CDATA/CHARACTERS events need + * to be combined to reconstruct the Base64 encoded string. You can't decode bits and pieces of each CDATA. Each + * CDATA has to be decoded in its entirety. + * + * @param processor used to deal with the decoded CDATA elements + * @param textMessage If this a text message we decode UTF8 and encode as a simple string + */ + private void getMessageBodyBytes(MessageBodyBytesProcessor processor, boolean decodeTextMessage) throws IOException, XMLStreamException { + int currentEventType; + StringBuilder cdata = new StringBuilder(); + while (reader.hasNext()) { + currentEventType = reader.getEventType(); + if (currentEventType == XMLStreamConstants.END_ELEMENT) { + break; + } else if (currentEventType == XMLStreamConstants.CHARACTERS && reader.isWhiteSpace() && cdata.length() > 0) { + /* when we hit a whitespace CHARACTERS event we know that the entire CDATA is complete so decode, pass back to + * the processor, and reset the cdata for the next event(s) + */ + if (decodeTextMessage) { + SimpleString text = new SimpleString(cdata.toString()); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(SimpleString.sizeofNullableString(text)); + SimpleString.writeNullableSimpleString(byteBuf, text); + byte[] bytes = new byte[SimpleString.sizeofNullableString(text)]; + byteBuf.readBytes(bytes); + processor.processBodyBytes(bytes); + } else { + processor.processBodyBytes(decode(cdata.toString())); + cdata.setLength(0); + } + } else { + cdata.append(new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()).trim()); + } + reader.next(); + } + } + + private static byte[] decode(String data) { + return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); + } + + private interface MessageBodyBytesProcessor { + void processBodyBytes(byte[] bytes) throws IOException; + } + + public class MessageInfo { + public long id; + public List queues; + public Message message; + public File tempFile; + + MessageInfo(long id, List queues, Message message, File tempFile) { + this.message = message; + this.queues = queues; + this.id = id; + this.tempFile = tempFile; + } + } +} diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java index 24e56b2e58..a5a7c97d36 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java @@ -26,7 +26,7 @@ public final class XmlDataConstants { // Utility } - static final String XML_VERSION = "1.0"; + public static final String XML_VERSION = "1.0"; static final String DOCUMENT_PARENT = "activemq-journal"; static final String BINDINGS_PARENT = "bindings"; @@ -50,7 +50,7 @@ public final class XmlDataConstants { static final String ADDRESS_BINDING_ID = "id"; static final String ADDRESS_BINDING_ROUTING_TYPE = "routing-types"; - static final String MESSAGES_PARENT = "messages"; + public static final String MESSAGES_PARENT = "messages"; static final String MESSAGES_CHILD = "message"; static final String MESSAGE_ID = "id"; static final String MESSAGE_PRIORITY = "priority"; diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java index 85eac4a940..0125cb75a5 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java @@ -36,7 +36,6 @@ import java.util.TreeMap; import io.airlift.airline.Command; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; @@ -48,7 +47,6 @@ import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; -import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; @@ -66,12 +64,10 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.Persisten import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.JournalType; -import org.apache.activemq.artemis.core.server.LargeServerMessage; @Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.") public final class XmlDataExporter extends DBOption { - private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L; private XMLStreamWriter xmlWriter; // an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID @@ -92,6 +88,8 @@ public final class XmlDataExporter extends DBOption { long bindingsPrinted = 0L; + XMLMessageExporter exporter; + @Override public Object execute(ActionContext context) throws Exception { super.execute(context); @@ -141,7 +139,7 @@ public final class XmlDataExporter extends DBOption { XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8"); PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter); xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler); - + exporter = new XMLMessageExporter(xmlWriter); writeXMLData(); } @@ -317,7 +315,7 @@ public final class XmlDataExporter extends DBOption { private void printDataAsXML() { try { - xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION); + xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT); printBindingsAsXML(); printAllMessagesAsXML(); @@ -375,6 +373,10 @@ public final class XmlDataExporter extends DBOption { xmlWriter.writeEndElement(); // end "messages" } + private void printSingleMessageAsXML(ICoreMessage message, List queues) throws Exception { + exporter.printSingleMessageAsXML(message, queues, false); + messagesPrinted++; + } /** * Reads from the page files and prints messages as it finds them (making sure to check acks and transactions * from the journal). @@ -444,104 +446,9 @@ public final class XmlDataExporter extends DBOption { } } - private void printSingleMessageAsXML(ICoreMessage message, List queues) throws Exception { - xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD); - printMessageAttributes(message); - printMessageProperties(message); - printMessageQueues(queues); - printMessageBody(message.toCore()); - xmlWriter.writeEndElement(); // end MESSAGES_CHILD - messagesPrinted++; - } - - private void printMessageBody(Message message) throws Exception { - xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY); - - if (message.toCore().isLargeMessage()) { - printLargeMessageBody((LargeServerMessage) message); - } else { - xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBody(message)); - } - xmlWriter.writeEndElement(); // end MESSAGE_BODY - } - - private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException { - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString()); - LargeBodyEncoder encoder = null; - - try { - encoder = message.toCore().getBodyEncoder(); - encoder.open(); - long totalBytesWritten = 0; - Long bufferSize; - long bodySize = encoder.getLargeBodySize(); - for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) { - Long remainder = bodySize - totalBytesWritten; - if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) { - bufferSize = LARGE_MESSAGE_CHUNK_SIZE; - } else { - bufferSize = remainder; - } - ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue()); - encoder.encode(buffer, bufferSize.intValue()); - xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array())); - totalBytesWritten += bufferSize; - } - encoder.close(); - } catch (ActiveMQException e) { - e.printStackTrace(); - } finally { - if (encoder != null) { - try { - encoder.close(); - } catch (ActiveMQException e) { - e.printStackTrace(); - } - } - } - } - - private void printMessageQueues(List queues) throws XMLStreamException { - xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT); - for (String queueName : queues) { - xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD); - xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName); - } - xmlWriter.writeEndElement(); // end QUEUES_PARENT - } - - private void printMessageProperties(Message message) throws XMLStreamException { - xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT); - for (SimpleString key : message.getPropertyNames()) { - Object value = message.getObjectProperty(key); - xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD); - xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString()); - xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, XmlDataExporterUtil.convertProperty(value)); - - // Write the property type as an attribute - String propertyType = XmlDataExporterUtil.getPropertyType(value); - if (propertyType != null) { - xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, propertyType); - } - } - xmlWriter.writeEndElement(); // end PROPERTIES_PARENT - } - - private void printMessageAttributes(ICoreMessage message) throws XMLStreamException { - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID())); - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority())); - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration())); - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp())); - String prettyType = XmlDataExporterUtil.getMessagePrettyType(message.getType()); - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType); - if (message.getUserID() != null) { - xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString()); - } - } - - private List extractQueueNames(HashMap refMap) { + private List extractQueueNames(HashMap refMap) { List queues = new ArrayList<>(); - for (ReferenceDescribe ref : refMap.values()) { + for (DescribeJournal.ReferenceDescribe ref : refMap.values()) { queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString()); } return queues; @@ -552,7 +459,7 @@ public final class XmlDataExporter extends DBOption { /** * Proxy to handle indenting the XML since javax.xml.stream.XMLStreamWriter doesn't support that. */ - static class PrettyPrintHandler implements InvocationHandler { + public static class PrettyPrintHandler implements InvocationHandler { private final XMLStreamWriter target; @@ -564,7 +471,7 @@ public final class XmlDataExporter extends DBOption { boolean wrap = true; - PrettyPrintHandler(XMLStreamWriter target) { + public PrettyPrintHandler(XMLStreamWriter target) { this.target = target; } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java index df48dcf471..7e6545cec9 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java @@ -88,7 +88,7 @@ public class XmlDataExporterUtil { /** * Base64 encode a ServerMessage body into the proper XML format */ - static String encodeMessageBody(final Message message) throws Exception { + static String encodeMessageBodyBase64(final Message message) throws Exception { Preconditions.checkNotNull(message, "ServerMessage can not be null"); ActiveMQBuffer byteBuffer = message.toCore().getReadOnlyBodyBuffer(); diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java index 595ed557b0..bec2fbdfad 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java @@ -19,25 +19,18 @@ package org.apache.activemq.artemis.cli.commands.tools.xml; import javax.xml.XMLConstants; import javax.xml.stream.XMLInputFactory; import javax.xml.stream.XMLStreamConstants; -import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamReader; import javax.xml.transform.stax.StAXSource; import javax.xml.validation.Schema; import javax.xml.validation.SchemaFactory; import javax.xml.validation.Validator; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.net.URL; import java.nio.ByteBuffer; import java.security.AccessController; import java.security.PrivilegedAction; -import java.util.ArrayList; import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; @@ -47,7 +40,6 @@ import java.util.TreeSet; import io.airlift.airline.Command; import io.airlift.airline.Option; -import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -68,10 +60,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.ClassloadingUtil; import org.apache.activemq.artemis.utils.ListUtil; -import org.apache.activemq.artemis.utils.UUIDGenerator; import org.jboss.logging.Logger; /** @@ -86,6 +76,8 @@ public final class XmlDataImporter extends ActionAbstract { private XMLStreamReader reader; + private XMLMessageImporter messageReader; + // this session is really only needed if the "session" variable does not auto-commit sends ClientSession managementSession; @@ -123,7 +115,7 @@ public final class XmlDataImporter extends ActionAbstract { @Option(name = "--legacy-prefixes", description = "Do not remove prefixes from legacy imports") public boolean legacyPrefixes = false; - TreeSet messages; + TreeSet messages; public String getPassword() { return password; @@ -179,6 +171,9 @@ public final class XmlDataImporter extends ActionAbstract { ClientSession session, ClientSession managementSession) throws Exception { reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream); + messageReader = new XMLMessageImporter(reader, session); + messageReader.setOldPrefixTranslation(oldPrefixTranslation); + this.session = session; if (managementSession != null) { this.managementSession = managementSession; @@ -237,9 +232,9 @@ public final class XmlDataImporter extends ActionAbstract { private void processXml() throws Exception { if (sort) { - messages = new TreeSet(new Comparator() { + messages = new TreeSet(new Comparator() { @Override - public int compare(MessageTemp o1, MessageTemp o2) { + public int compare(XMLMessageImporter.MessageInfo o1, XMLMessageImporter.MessageInfo o2) { if (o1.id == o2.id) { return 0; } else if (o1.id > o2.id) { @@ -270,8 +265,8 @@ public final class XmlDataImporter extends ActionAbstract { } if (sort) { - for (MessageTemp msgtmp : messages) { - sendMessage(msgtmp.queues, msgtmp.message, msgtmp.tempFileName); + for (XMLMessageImporter.MessageInfo msgtmp : messages) { + sendMessage(msgtmp.queues, msgtmp.message, msgtmp.tempFile); } } @@ -288,118 +283,14 @@ public final class XmlDataImporter extends ActionAbstract { } private void processMessage() throws Exception { - Byte type = 0; - Byte priority = 0; - Long expiration = 0L; - Long timestamp = 0L; - Long id = 0L; - org.apache.activemq.artemis.utils.UUID userId = null; - ArrayList queues = new ArrayList<>(); - - // get message's attributes - for (int i = 0; i < reader.getAttributeCount(); i++) { - String attributeName = reader.getAttributeLocalName(i); - switch (attributeName) { - case XmlDataConstants.MESSAGE_TYPE: - type = getMessageType(reader.getAttributeValue(i)); - break; - case XmlDataConstants.MESSAGE_PRIORITY: - priority = Byte.parseByte(reader.getAttributeValue(i)); - break; - case XmlDataConstants.MESSAGE_EXPIRATION: - expiration = Long.parseLong(reader.getAttributeValue(i)); - break; - case XmlDataConstants.MESSAGE_TIMESTAMP: - timestamp = Long.parseLong(reader.getAttributeValue(i)); - break; - case XmlDataConstants.MESSAGE_USER_ID: - userId = UUIDGenerator.getInstance().generateUUID(); - break; - case XmlDataConstants.MESSAGE_ID: - id = Long.parseLong(reader.getAttributeValue(i)); - break; - } - } - - Message message = session.createMessage(type, true, expiration, timestamp, priority); - message.setUserID(userId); - - boolean endLoop = false; - - File largeMessageTemporaryFile = null; - // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.) - while (reader.hasNext()) { - int eventType = reader.getEventType(); - switch (eventType) { - case XMLStreamConstants.START_ELEMENT: - if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) { - largeMessageTemporaryFile = processMessageBody(message.toCore()); - } else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) { - processMessageProperties(message); - } else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) { - processMessageQueues(queues); - } - break; - case XMLStreamConstants.END_ELEMENT: - if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) { - endLoop = true; - } - break; - } - if (endLoop) { - break; - } - reader.next(); - } - + XMLMessageImporter.MessageInfo info = messageReader.readMessage(false); if (sort) { - messages.add(new MessageTemp(id, queues, message, largeMessageTemporaryFile)); + messages.add(info); } else { - sendMessage(queues, message, largeMessageTemporaryFile); + sendMessage(info.queues, info.message, info.tempFile); } } - - class MessageTemp { - long id; - List queues; - Message message; - File tempFileName; - - MessageTemp(long id, List queues, Message message, File tempFileName) { - this.message = message; - this.queues = queues; - this.message = message; - this.id = id; - this.tempFileName = tempFileName; - } - } - - private Byte getMessageType(String value) { - Byte type = Message.DEFAULT_TYPE; - switch (value) { - case XmlDataConstants.DEFAULT_TYPE_PRETTY: - type = Message.DEFAULT_TYPE; - break; - case XmlDataConstants.BYTES_TYPE_PRETTY: - type = Message.BYTES_TYPE; - break; - case XmlDataConstants.MAP_TYPE_PRETTY: - type = Message.MAP_TYPE; - break; - case XmlDataConstants.OBJECT_TYPE_PRETTY: - type = Message.OBJECT_TYPE; - break; - case XmlDataConstants.STREAM_TYPE_PRETTY: - type = Message.STREAM_TYPE; - break; - case XmlDataConstants.TEXT_TYPE_PRETTY: - type = Message.TEXT_TYPE; - break; - } - return type; - } - private void sendMessage(List queues, Message message, File tempFileName) throws Exception { StringBuilder logMessage = new StringBuilder(); String destination = addressMap.get(queues.get(0)); @@ -460,153 +351,6 @@ public final class XmlDataImporter extends ActionAbstract { } } - private void processMessageQueues(ArrayList queues) { - for (int i = 0; i < reader.getAttributeCount(); i++) { - if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i))) { - String queueName = reader.getAttributeValue(i); - String translation = checkPrefix(queueName); - queues.add(translation); - } - } - } - - private String checkPrefix(String queueName) { - String newQueueName = oldPrefixTranslation.get(queueName); - if (newQueueName == null) { - newQueueName = queueName; - } - return newQueueName; - } - - private void processMessageProperties(Message message) { - String key = ""; - String value = ""; - String propertyType = ""; - - for (int i = 0; i < reader.getAttributeCount(); i++) { - String attributeName = reader.getAttributeLocalName(i); - switch (attributeName) { - case XmlDataConstants.PROPERTY_NAME: - key = reader.getAttributeValue(i); - break; - case XmlDataConstants.PROPERTY_VALUE: - value = reader.getAttributeValue(i); - break; - case XmlDataConstants.PROPERTY_TYPE: - propertyType = reader.getAttributeValue(i); - break; - } - } - - if (value.equals(XmlDataConstants.NULL)) { - value = null; - } - - switch (propertyType) { - case XmlDataConstants.PROPERTY_TYPE_SHORT: - message.putShortProperty(key, Short.parseShort(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_BOOLEAN: - message.putBooleanProperty(key, Boolean.parseBoolean(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_BYTE: - message.putByteProperty(key, Byte.parseByte(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_BYTES: - message.putBytesProperty(key, value == null ? null : decode(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_DOUBLE: - message.putDoubleProperty(key, Double.parseDouble(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_FLOAT: - message.putFloatProperty(key, Float.parseFloat(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_INTEGER: - message.putIntProperty(key, Integer.parseInt(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_LONG: - message.putLongProperty(key, Long.parseLong(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING: - message.putStringProperty(new SimpleString(key), value == null ? null : SimpleString.toSimpleString(value)); - break; - case XmlDataConstants.PROPERTY_TYPE_STRING: - message.putStringProperty(key, value); - break; - } - } - - private File processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException { - File tempFileName = null; - boolean isLarge = false; - - for (int i = 0; i < reader.getAttributeCount(); i++) { - String attributeName = reader.getAttributeLocalName(i); - if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName)) { - isLarge = Boolean.parseBoolean(reader.getAttributeValue(i)); - } - } - reader.next(); - if (logger.isDebugEnabled()) { - logger.debug("XMLStreamReader impl: " + reader); - } - if (isLarge) { - tempFileName = File.createTempFile("largeMessage", ".tmp"); - if (logger.isDebugEnabled()) { - logger.debug("Creating temp file " + tempFileName + " for large message."); - } - try (OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFileName))) { - getMessageBodyBytes(new MessageBodyBytesProcessor() { - @Override - public void processBodyBytes(byte[] bytes) throws IOException { - out.write(bytes); - } - }); - } - FileInputStream fileInputStream = new FileInputStream(tempFileName); - BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream); - ((ClientMessage) message).setBodyInputStream(bufferedInput); - } else { - getMessageBodyBytes(new MessageBodyBytesProcessor() { - @Override - public void processBodyBytes(byte[] bytes) throws IOException { - message.getBodyBuffer().writeBytes(bytes); - } - }); - } - - return tempFileName; - } - - /** - * Message bodies are written to XML as one or more Base64 encoded CDATA elements. Some parser implementations won't - * read an entire CDATA element at once (e.g. Woodstox) so it's possible that multiple CDATA/CHARACTERS events need - * to be combined to reconstruct the Base64 encoded string. You can't decode bits and pieces of each CDATA. Each - * CDATA has to be decoded in its entirety. - * - * @param processor used to deal with the decoded CDATA elements - */ - private void getMessageBodyBytes(MessageBodyBytesProcessor processor) throws IOException, XMLStreamException { - int currentEventType; - StringBuilder cdata = new StringBuilder(); - while (reader.hasNext()) { - currentEventType = reader.getEventType(); - if (currentEventType == XMLStreamConstants.END_ELEMENT) { - break; - } else if (currentEventType == XMLStreamConstants.CHARACTERS && reader.isWhiteSpace() && cdata.length() > 0) { - /* when we hit a whitespace CHARACTERS event we know that the entire CDATA is complete so decode, pass back to - * the processor, and reset the cdata for the next event(s) - */ - processor.processBodyBytes(decode(cdata.toString())); - cdata.setLength(0); - } else { - cdata.append(new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()).trim()); - } - reader.next(); - } - } - - private void oldBinding() throws Exception { String queueName = ""; String address = ""; @@ -762,11 +506,5 @@ public final class XmlDataImporter extends ActionAbstract { } } - private static byte[] decode(String data) { - return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); - } - private interface MessageBodyBytesProcessor { - void processBodyBytes(byte[] bytes) throws IOException; - } }