ARTEMIS-1840 Refactor XML Data Serialiser
This commit is contained in:
parent
13fac86082
commit
812776fca7
artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml
|
@ -0,0 +1,148 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.cli.commands.tools.xml;
|
||||
|
||||
import javax.xml.stream.XMLStreamException;
|
||||
import javax.xml.stream.XMLStreamWriter;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
|
||||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||
import org.apache.activemq.artemis.reader.TextMessageUtil;
|
||||
|
||||
|
||||
/** This is an Utility class that will import the outputs in XML format. */
|
||||
public class XMLMessageExporter {
|
||||
|
||||
private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
|
||||
|
||||
private XMLStreamWriter xmlWriter;
|
||||
|
||||
public XMLMessageExporter(XMLStreamWriter xmlWriter) {
|
||||
this.xmlWriter = xmlWriter;
|
||||
}
|
||||
|
||||
public XMLStreamWriter getRawXMLWriter() {
|
||||
return xmlWriter;
|
||||
}
|
||||
|
||||
public void printSingleMessageAsXML(ICoreMessage message, List<String> queues, boolean encodeTextUTF8) throws Exception {
|
||||
xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD);
|
||||
printMessageAttributes(message);
|
||||
printMessageProperties(message);
|
||||
printMessageQueues(queues);
|
||||
printMessageBody(message.toCore(), encodeTextUTF8);
|
||||
xmlWriter.writeEndElement(); // end MESSAGES_CHILD
|
||||
}
|
||||
|
||||
public void printMessageBody(Message message, boolean encodeTextMessageUTF8) throws Exception {
|
||||
xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
|
||||
|
||||
if (message.isLargeMessage()) {
|
||||
printLargeMessageBody((LargeServerMessage) message);
|
||||
} else {
|
||||
if (encodeTextMessageUTF8 && message.toCore().getType() == Message.TEXT_TYPE) {
|
||||
xmlWriter.writeCData(TextMessageUtil.readBodyText(message.toCore().getReadOnlyBodyBuffer()).toString());
|
||||
} else {
|
||||
xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBodyBase64(message));
|
||||
}
|
||||
}
|
||||
xmlWriter.writeEndElement(); // end MESSAGE_BODY
|
||||
}
|
||||
|
||||
public void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException {
|
||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
|
||||
LargeBodyEncoder encoder = null;
|
||||
|
||||
try {
|
||||
encoder = message.toCore().getBodyEncoder();
|
||||
encoder.open();
|
||||
long totalBytesWritten = 0;
|
||||
Long bufferSize;
|
||||
long bodySize = encoder.getLargeBodySize();
|
||||
for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) {
|
||||
Long remainder = bodySize - totalBytesWritten;
|
||||
if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) {
|
||||
bufferSize = LARGE_MESSAGE_CHUNK_SIZE;
|
||||
} else {
|
||||
bufferSize = remainder;
|
||||
}
|
||||
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue());
|
||||
encoder.encode(buffer, bufferSize.intValue());
|
||||
xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array()));
|
||||
totalBytesWritten += bufferSize;
|
||||
}
|
||||
encoder.close();
|
||||
} catch (ActiveMQException e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
if (encoder != null) {
|
||||
try {
|
||||
encoder.close();
|
||||
} catch (ActiveMQException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void printMessageQueues(List<String> queues) throws XMLStreamException {
|
||||
if (queues != null) {
|
||||
xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT);
|
||||
for (String queueName : queues) {
|
||||
xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD);
|
||||
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName);
|
||||
}
|
||||
xmlWriter.writeEndElement(); // end QUEUES_PARENT
|
||||
}
|
||||
}
|
||||
|
||||
public void printMessageProperties(Message message) throws XMLStreamException {
|
||||
xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT);
|
||||
for (SimpleString key : message.getPropertyNames()) {
|
||||
Object value = message.getObjectProperty(key);
|
||||
xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD);
|
||||
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString());
|
||||
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, XmlDataExporterUtil.convertProperty(value));
|
||||
|
||||
// Write the property type as an attribute
|
||||
String propertyType = XmlDataExporterUtil.getPropertyType(value);
|
||||
if (propertyType != null) {
|
||||
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, propertyType);
|
||||
}
|
||||
}
|
||||
xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
|
||||
}
|
||||
|
||||
public void printMessageAttributes(ICoreMessage message) throws XMLStreamException {
|
||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID()));
|
||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
|
||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));
|
||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp()));
|
||||
String prettyType = XmlDataExporterUtil.getMessagePrettyType(message.getType());
|
||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType);
|
||||
if (message.getUserID() != null) {
|
||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,331 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.cli.commands.tools.xml;
|
||||
|
||||
import javax.xml.stream.XMLStreamConstants;
|
||||
import javax.xml.stream.XMLStreamException;
|
||||
import javax.xml.stream.XMLStreamReader;
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.utils.Base64;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/** This is an Utility class that will import the outputs in XML format. */
|
||||
public class XMLMessageImporter {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(XMLMessageImporter.class);
|
||||
|
||||
private XMLStreamReader reader;
|
||||
|
||||
private ClientSession session;
|
||||
|
||||
Map<String, String> oldPrefixTranslation = new HashMap<>();
|
||||
|
||||
public XMLMessageImporter(XMLStreamReader xmlStreamReader, ClientSession session) {
|
||||
this.reader = xmlStreamReader;
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
public void setOldPrefixTranslation(Map<String, String> oldPrefixTranslation) {
|
||||
this.oldPrefixTranslation = oldPrefixTranslation;
|
||||
}
|
||||
|
||||
public XMLStreamReader getRawXMLReader() {
|
||||
return reader;
|
||||
}
|
||||
|
||||
public MessageInfo readMessage(boolean decodeUTF8) throws Exception {
|
||||
if (!reader.hasNext()) return null;
|
||||
|
||||
Byte type = 0;
|
||||
Byte priority = 0;
|
||||
Long expiration = 0L;
|
||||
Long timestamp = 0L;
|
||||
Long id = 0L;
|
||||
org.apache.activemq.artemis.utils.UUID userId = null;
|
||||
ArrayList<String> queues = new ArrayList<>();
|
||||
|
||||
// get message's attributes
|
||||
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
||||
String attributeName = reader.getAttributeLocalName(i);
|
||||
switch (attributeName) {
|
||||
case XmlDataConstants.MESSAGE_TYPE:
|
||||
type = getMessageType(reader.getAttributeValue(i));
|
||||
break;
|
||||
case XmlDataConstants.MESSAGE_PRIORITY:
|
||||
priority = Byte.parseByte(reader.getAttributeValue(i));
|
||||
break;
|
||||
case XmlDataConstants.MESSAGE_EXPIRATION:
|
||||
expiration = Long.parseLong(reader.getAttributeValue(i));
|
||||
break;
|
||||
case XmlDataConstants.MESSAGE_TIMESTAMP:
|
||||
timestamp = Long.parseLong(reader.getAttributeValue(i));
|
||||
break;
|
||||
case XmlDataConstants.MESSAGE_USER_ID:
|
||||
userId = UUIDGenerator.getInstance().generateUUID();
|
||||
break;
|
||||
case XmlDataConstants.MESSAGE_ID:
|
||||
id = Long.parseLong(reader.getAttributeValue(i));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Message message = session.createMessage(type, true, expiration, timestamp, priority);
|
||||
|
||||
message.setUserID(userId);
|
||||
|
||||
boolean endLoop = false;
|
||||
|
||||
File largeMessageTemporaryFile = null;
|
||||
// loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.)
|
||||
while (reader.hasNext()) {
|
||||
int eventType = reader.getEventType();
|
||||
switch (eventType) {
|
||||
case XMLStreamConstants.START_ELEMENT:
|
||||
if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) {
|
||||
largeMessageTemporaryFile = processMessageBody(message.toCore(), decodeUTF8);
|
||||
} else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) {
|
||||
processMessageProperties(message);
|
||||
} else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) {
|
||||
processMessageQueues(queues);
|
||||
}
|
||||
break;
|
||||
case XMLStreamConstants.END_ELEMENT:
|
||||
if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) {
|
||||
endLoop = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
if (endLoop) {
|
||||
break;
|
||||
}
|
||||
reader.next();
|
||||
}
|
||||
return new MessageInfo(id, queues, message, largeMessageTemporaryFile);
|
||||
}
|
||||
|
||||
private Byte getMessageType(String value) {
|
||||
Byte type = Message.DEFAULT_TYPE;
|
||||
switch (value) {
|
||||
case XmlDataConstants.DEFAULT_TYPE_PRETTY:
|
||||
type = Message.DEFAULT_TYPE;
|
||||
break;
|
||||
case XmlDataConstants.BYTES_TYPE_PRETTY:
|
||||
type = Message.BYTES_TYPE;
|
||||
break;
|
||||
case XmlDataConstants.MAP_TYPE_PRETTY:
|
||||
type = Message.MAP_TYPE;
|
||||
break;
|
||||
case XmlDataConstants.OBJECT_TYPE_PRETTY:
|
||||
type = Message.OBJECT_TYPE;
|
||||
break;
|
||||
case XmlDataConstants.STREAM_TYPE_PRETTY:
|
||||
type = Message.STREAM_TYPE;
|
||||
break;
|
||||
case XmlDataConstants.TEXT_TYPE_PRETTY:
|
||||
type = Message.TEXT_TYPE;
|
||||
break;
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
private void processMessageQueues(ArrayList<String> queues) {
|
||||
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
||||
if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i))) {
|
||||
String queueName = reader.getAttributeValue(i);
|
||||
String translation = checkPrefix(queueName);
|
||||
queues.add(translation);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String checkPrefix(String queueName) {
|
||||
String newQueueName = oldPrefixTranslation.get(queueName);
|
||||
if (newQueueName == null) {
|
||||
newQueueName = queueName;
|
||||
}
|
||||
return newQueueName;
|
||||
}
|
||||
|
||||
private void processMessageProperties(Message message) {
|
||||
String key = "";
|
||||
String value = "";
|
||||
String propertyType = "";
|
||||
|
||||
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
||||
String attributeName = reader.getAttributeLocalName(i);
|
||||
switch (attributeName) {
|
||||
case XmlDataConstants.PROPERTY_NAME:
|
||||
key = reader.getAttributeValue(i);
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_VALUE:
|
||||
value = reader.getAttributeValue(i);
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE:
|
||||
propertyType = reader.getAttributeValue(i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (value.equals(XmlDataConstants.NULL)) {
|
||||
value = null;
|
||||
}
|
||||
|
||||
switch (propertyType) {
|
||||
case XmlDataConstants.PROPERTY_TYPE_SHORT:
|
||||
message.putShortProperty(key, Short.parseShort(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_BOOLEAN:
|
||||
message.putBooleanProperty(key, Boolean.parseBoolean(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_BYTE:
|
||||
message.putByteProperty(key, Byte.parseByte(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_BYTES:
|
||||
message.putBytesProperty(key, value == null ? null : decode(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_DOUBLE:
|
||||
message.putDoubleProperty(key, Double.parseDouble(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_FLOAT:
|
||||
message.putFloatProperty(key, Float.parseFloat(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_INTEGER:
|
||||
message.putIntProperty(key, Integer.parseInt(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_LONG:
|
||||
message.putLongProperty(key, Long.parseLong(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING:
|
||||
message.putStringProperty(new SimpleString(key), value == null ? null : SimpleString.toSimpleString(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_STRING:
|
||||
message.putStringProperty(key, value);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private File processMessageBody(final ICoreMessage message, boolean decodeTextMessage) throws XMLStreamException, IOException {
|
||||
File tempFileName = null;
|
||||
boolean isLarge = false;
|
||||
|
||||
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
||||
String attributeName = reader.getAttributeLocalName(i);
|
||||
if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName)) {
|
||||
isLarge = Boolean.parseBoolean(reader.getAttributeValue(i));
|
||||
}
|
||||
}
|
||||
reader.next();
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("XMLStreamReader impl: " + reader);
|
||||
}
|
||||
if (isLarge) {
|
||||
tempFileName = File.createTempFile("largeMessage", ".tmp");
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Creating temp file " + tempFileName + " for large message.");
|
||||
}
|
||||
try (OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFileName))) {
|
||||
getMessageBodyBytes(bytes -> out.write(bytes), (message.toCore().getType() == Message.TEXT_TYPE) && decodeTextMessage);
|
||||
}
|
||||
FileInputStream fileInputStream = new FileInputStream(tempFileName);
|
||||
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
|
||||
((ClientMessage) message).setBodyInputStream(bufferedInput);
|
||||
} else {
|
||||
getMessageBodyBytes(bytes -> message.getBodyBuffer().writeBytes(bytes), (message.toCore().getType() == Message.TEXT_TYPE) && decodeTextMessage);
|
||||
}
|
||||
|
||||
return tempFileName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Message bodies are written to XML as one or more Base64 encoded CDATA elements. Some parser implementations won't
|
||||
* read an entire CDATA element at once (e.g. Woodstox) so it's possible that multiple CDATA/CHARACTERS events need
|
||||
* to be combined to reconstruct the Base64 encoded string. You can't decode bits and pieces of each CDATA. Each
|
||||
* CDATA has to be decoded in its entirety.
|
||||
*
|
||||
* @param processor used to deal with the decoded CDATA elements
|
||||
* @param textMessage If this a text message we decode UTF8 and encode as a simple string
|
||||
*/
|
||||
private void getMessageBodyBytes(MessageBodyBytesProcessor processor, boolean decodeTextMessage) throws IOException, XMLStreamException {
|
||||
int currentEventType;
|
||||
StringBuilder cdata = new StringBuilder();
|
||||
while (reader.hasNext()) {
|
||||
currentEventType = reader.getEventType();
|
||||
if (currentEventType == XMLStreamConstants.END_ELEMENT) {
|
||||
break;
|
||||
} else if (currentEventType == XMLStreamConstants.CHARACTERS && reader.isWhiteSpace() && cdata.length() > 0) {
|
||||
/* when we hit a whitespace CHARACTERS event we know that the entire CDATA is complete so decode, pass back to
|
||||
* the processor, and reset the cdata for the next event(s)
|
||||
*/
|
||||
if (decodeTextMessage) {
|
||||
SimpleString text = new SimpleString(cdata.toString());
|
||||
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(SimpleString.sizeofNullableString(text));
|
||||
SimpleString.writeNullableSimpleString(byteBuf, text);
|
||||
byte[] bytes = new byte[SimpleString.sizeofNullableString(text)];
|
||||
byteBuf.readBytes(bytes);
|
||||
processor.processBodyBytes(bytes);
|
||||
} else {
|
||||
processor.processBodyBytes(decode(cdata.toString()));
|
||||
cdata.setLength(0);
|
||||
}
|
||||
} else {
|
||||
cdata.append(new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()).trim());
|
||||
}
|
||||
reader.next();
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] decode(String data) {
|
||||
return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
|
||||
}
|
||||
|
||||
private interface MessageBodyBytesProcessor {
|
||||
void processBodyBytes(byte[] bytes) throws IOException;
|
||||
}
|
||||
|
||||
public class MessageInfo {
|
||||
public long id;
|
||||
public List<String> queues;
|
||||
public Message message;
|
||||
public File tempFile;
|
||||
|
||||
MessageInfo(long id, List<String> queues, Message message, File tempFile) {
|
||||
this.message = message;
|
||||
this.queues = queues;
|
||||
this.id = id;
|
||||
this.tempFile = tempFile;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -26,7 +26,7 @@ public final class XmlDataConstants {
|
|||
// Utility
|
||||
}
|
||||
|
||||
static final String XML_VERSION = "1.0";
|
||||
public static final String XML_VERSION = "1.0";
|
||||
static final String DOCUMENT_PARENT = "activemq-journal";
|
||||
static final String BINDINGS_PARENT = "bindings";
|
||||
|
||||
|
@ -50,7 +50,7 @@ public final class XmlDataConstants {
|
|||
static final String ADDRESS_BINDING_ID = "id";
|
||||
static final String ADDRESS_BINDING_ROUTING_TYPE = "routing-types";
|
||||
|
||||
static final String MESSAGES_PARENT = "messages";
|
||||
public static final String MESSAGES_PARENT = "messages";
|
||||
static final String MESSAGES_CHILD = "message";
|
||||
static final String MESSAGE_ID = "id";
|
||||
static final String MESSAGE_PRIORITY = "priority";
|
||||
|
|
|
@ -36,7 +36,6 @@ import java.util.TreeMap;
|
|||
import io.airlift.airline.Command;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
|
@ -48,7 +47,6 @@ import org.apache.activemq.artemis.core.journal.Journal;
|
|||
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
|
||||
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
|
||||
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
||||
|
@ -66,12 +64,10 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.Persisten
|
|||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.JournalType;
|
||||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||
|
||||
@Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
|
||||
public final class XmlDataExporter extends DBOption {
|
||||
|
||||
private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
|
||||
private XMLStreamWriter xmlWriter;
|
||||
|
||||
// an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID
|
||||
|
@ -92,6 +88,8 @@ public final class XmlDataExporter extends DBOption {
|
|||
|
||||
long bindingsPrinted = 0L;
|
||||
|
||||
XMLMessageExporter exporter;
|
||||
|
||||
@Override
|
||||
public Object execute(ActionContext context) throws Exception {
|
||||
super.execute(context);
|
||||
|
@ -141,7 +139,7 @@ public final class XmlDataExporter extends DBOption {
|
|||
XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8");
|
||||
PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter);
|
||||
xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler);
|
||||
|
||||
exporter = new XMLMessageExporter(xmlWriter);
|
||||
writeXMLData();
|
||||
}
|
||||
|
||||
|
@ -317,7 +315,7 @@ public final class XmlDataExporter extends DBOption {
|
|||
|
||||
private void printDataAsXML() {
|
||||
try {
|
||||
xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION);
|
||||
|
||||
xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT);
|
||||
printBindingsAsXML();
|
||||
printAllMessagesAsXML();
|
||||
|
@ -375,6 +373,10 @@ public final class XmlDataExporter extends DBOption {
|
|||
xmlWriter.writeEndElement(); // end "messages"
|
||||
}
|
||||
|
||||
private void printSingleMessageAsXML(ICoreMessage message, List<String> queues) throws Exception {
|
||||
exporter.printSingleMessageAsXML(message, queues, false);
|
||||
messagesPrinted++;
|
||||
}
|
||||
/**
|
||||
* Reads from the page files and prints messages as it finds them (making sure to check acks and transactions
|
||||
* from the journal).
|
||||
|
@ -444,104 +446,9 @@ public final class XmlDataExporter extends DBOption {
|
|||
}
|
||||
}
|
||||
|
||||
private void printSingleMessageAsXML(ICoreMessage message, List<String> queues) throws Exception {
|
||||
xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD);
|
||||
printMessageAttributes(message);
|
||||
printMessageProperties(message);
|
||||
printMessageQueues(queues);
|
||||
printMessageBody(message.toCore());
|
||||
xmlWriter.writeEndElement(); // end MESSAGES_CHILD
|
||||
messagesPrinted++;
|
||||
}
|
||||
|
||||
private void printMessageBody(Message message) throws Exception {
|
||||
xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
|
||||
|
||||
if (message.toCore().isLargeMessage()) {
|
||||
printLargeMessageBody((LargeServerMessage) message);
|
||||
} else {
|
||||
xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBody(message));
|
||||
}
|
||||
xmlWriter.writeEndElement(); // end MESSAGE_BODY
|
||||
}
|
||||
|
||||
private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException {
|
||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
|
||||
LargeBodyEncoder encoder = null;
|
||||
|
||||
try {
|
||||
encoder = message.toCore().getBodyEncoder();
|
||||
encoder.open();
|
||||
long totalBytesWritten = 0;
|
||||
Long bufferSize;
|
||||
long bodySize = encoder.getLargeBodySize();
|
||||
for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) {
|
||||
Long remainder = bodySize - totalBytesWritten;
|
||||
if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) {
|
||||
bufferSize = LARGE_MESSAGE_CHUNK_SIZE;
|
||||
} else {
|
||||
bufferSize = remainder;
|
||||
}
|
||||
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue());
|
||||
encoder.encode(buffer, bufferSize.intValue());
|
||||
xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array()));
|
||||
totalBytesWritten += bufferSize;
|
||||
}
|
||||
encoder.close();
|
||||
} catch (ActiveMQException e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
if (encoder != null) {
|
||||
try {
|
||||
encoder.close();
|
||||
} catch (ActiveMQException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void printMessageQueues(List<String> queues) throws XMLStreamException {
|
||||
xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT);
|
||||
for (String queueName : queues) {
|
||||
xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD);
|
||||
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName);
|
||||
}
|
||||
xmlWriter.writeEndElement(); // end QUEUES_PARENT
|
||||
}
|
||||
|
||||
private void printMessageProperties(Message message) throws XMLStreamException {
|
||||
xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT);
|
||||
for (SimpleString key : message.getPropertyNames()) {
|
||||
Object value = message.getObjectProperty(key);
|
||||
xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD);
|
||||
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString());
|
||||
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, XmlDataExporterUtil.convertProperty(value));
|
||||
|
||||
// Write the property type as an attribute
|
||||
String propertyType = XmlDataExporterUtil.getPropertyType(value);
|
||||
if (propertyType != null) {
|
||||
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, propertyType);
|
||||
}
|
||||
}
|
||||
xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
|
||||
}
|
||||
|
||||
private void printMessageAttributes(ICoreMessage message) throws XMLStreamException {
|
||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID()));
|
||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
|
||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));
|
||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp()));
|
||||
String prettyType = XmlDataExporterUtil.getMessagePrettyType(message.getType());
|
||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType);
|
||||
if (message.getUserID() != null) {
|
||||
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString());
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> extractQueueNames(HashMap<Long, ReferenceDescribe> refMap) {
|
||||
private List<String> extractQueueNames(HashMap<Long, DescribeJournal.ReferenceDescribe> refMap) {
|
||||
List<String> queues = new ArrayList<>();
|
||||
for (ReferenceDescribe ref : refMap.values()) {
|
||||
for (DescribeJournal.ReferenceDescribe ref : refMap.values()) {
|
||||
queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString());
|
||||
}
|
||||
return queues;
|
||||
|
@ -552,7 +459,7 @@ public final class XmlDataExporter extends DBOption {
|
|||
/**
|
||||
* Proxy to handle indenting the XML since <code>javax.xml.stream.XMLStreamWriter</code> doesn't support that.
|
||||
*/
|
||||
static class PrettyPrintHandler implements InvocationHandler {
|
||||
public static class PrettyPrintHandler implements InvocationHandler {
|
||||
|
||||
private final XMLStreamWriter target;
|
||||
|
||||
|
@ -564,7 +471,7 @@ public final class XmlDataExporter extends DBOption {
|
|||
|
||||
boolean wrap = true;
|
||||
|
||||
PrettyPrintHandler(XMLStreamWriter target) {
|
||||
public PrettyPrintHandler(XMLStreamWriter target) {
|
||||
this.target = target;
|
||||
}
|
||||
|
||||
|
|
|
@ -88,7 +88,7 @@ public class XmlDataExporterUtil {
|
|||
/**
|
||||
* Base64 encode a ServerMessage body into the proper XML format
|
||||
*/
|
||||
static String encodeMessageBody(final Message message) throws Exception {
|
||||
static String encodeMessageBodyBase64(final Message message) throws Exception {
|
||||
Preconditions.checkNotNull(message, "ServerMessage can not be null");
|
||||
|
||||
ActiveMQBuffer byteBuffer = message.toCore().getReadOnlyBodyBuffer();
|
||||
|
|
|
@ -19,25 +19,18 @@ package org.apache.activemq.artemis.cli.commands.tools.xml;
|
|||
import javax.xml.XMLConstants;
|
||||
import javax.xml.stream.XMLInputFactory;
|
||||
import javax.xml.stream.XMLStreamConstants;
|
||||
import javax.xml.stream.XMLStreamException;
|
||||
import javax.xml.stream.XMLStreamReader;
|
||||
import javax.xml.transform.stax.StAXSource;
|
||||
import javax.xml.validation.Schema;
|
||||
import javax.xml.validation.SchemaFactory;
|
||||
import javax.xml.validation.Validator;
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URL;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
|
@ -47,7 +40,6 @@ import java.util.TreeSet;
|
|||
|
||||
import io.airlift.airline.Command;
|
||||
import io.airlift.airline.Option;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -68,10 +60,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
|||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.utils.Base64;
|
||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
||||
import org.apache.activemq.artemis.utils.ListUtil;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
|
@ -86,6 +76,8 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
|
||||
private XMLStreamReader reader;
|
||||
|
||||
private XMLMessageImporter messageReader;
|
||||
|
||||
// this session is really only needed if the "session" variable does not auto-commit sends
|
||||
ClientSession managementSession;
|
||||
|
||||
|
@ -123,7 +115,7 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
@Option(name = "--legacy-prefixes", description = "Do not remove prefixes from legacy imports")
|
||||
public boolean legacyPrefixes = false;
|
||||
|
||||
TreeSet<MessageTemp> messages;
|
||||
TreeSet<XMLMessageImporter.MessageInfo> messages;
|
||||
|
||||
public String getPassword() {
|
||||
return password;
|
||||
|
@ -179,6 +171,9 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
ClientSession session,
|
||||
ClientSession managementSession) throws Exception {
|
||||
reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
|
||||
messageReader = new XMLMessageImporter(reader, session);
|
||||
messageReader.setOldPrefixTranslation(oldPrefixTranslation);
|
||||
|
||||
this.session = session;
|
||||
if (managementSession != null) {
|
||||
this.managementSession = managementSession;
|
||||
|
@ -237,9 +232,9 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
|
||||
private void processXml() throws Exception {
|
||||
if (sort) {
|
||||
messages = new TreeSet<MessageTemp>(new Comparator<MessageTemp>() {
|
||||
messages = new TreeSet<XMLMessageImporter.MessageInfo>(new Comparator<XMLMessageImporter.MessageInfo>() {
|
||||
@Override
|
||||
public int compare(MessageTemp o1, MessageTemp o2) {
|
||||
public int compare(XMLMessageImporter.MessageInfo o1, XMLMessageImporter.MessageInfo o2) {
|
||||
if (o1.id == o2.id) {
|
||||
return 0;
|
||||
} else if (o1.id > o2.id) {
|
||||
|
@ -270,8 +265,8 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
}
|
||||
|
||||
if (sort) {
|
||||
for (MessageTemp msgtmp : messages) {
|
||||
sendMessage(msgtmp.queues, msgtmp.message, msgtmp.tempFileName);
|
||||
for (XMLMessageImporter.MessageInfo msgtmp : messages) {
|
||||
sendMessage(msgtmp.queues, msgtmp.message, msgtmp.tempFile);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -288,118 +283,14 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
}
|
||||
|
||||
private void processMessage() throws Exception {
|
||||
Byte type = 0;
|
||||
Byte priority = 0;
|
||||
Long expiration = 0L;
|
||||
Long timestamp = 0L;
|
||||
Long id = 0L;
|
||||
org.apache.activemq.artemis.utils.UUID userId = null;
|
||||
ArrayList<String> queues = new ArrayList<>();
|
||||
|
||||
// get message's attributes
|
||||
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
||||
String attributeName = reader.getAttributeLocalName(i);
|
||||
switch (attributeName) {
|
||||
case XmlDataConstants.MESSAGE_TYPE:
|
||||
type = getMessageType(reader.getAttributeValue(i));
|
||||
break;
|
||||
case XmlDataConstants.MESSAGE_PRIORITY:
|
||||
priority = Byte.parseByte(reader.getAttributeValue(i));
|
||||
break;
|
||||
case XmlDataConstants.MESSAGE_EXPIRATION:
|
||||
expiration = Long.parseLong(reader.getAttributeValue(i));
|
||||
break;
|
||||
case XmlDataConstants.MESSAGE_TIMESTAMP:
|
||||
timestamp = Long.parseLong(reader.getAttributeValue(i));
|
||||
break;
|
||||
case XmlDataConstants.MESSAGE_USER_ID:
|
||||
userId = UUIDGenerator.getInstance().generateUUID();
|
||||
break;
|
||||
case XmlDataConstants.MESSAGE_ID:
|
||||
id = Long.parseLong(reader.getAttributeValue(i));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Message message = session.createMessage(type, true, expiration, timestamp, priority);
|
||||
message.setUserID(userId);
|
||||
|
||||
boolean endLoop = false;
|
||||
|
||||
File largeMessageTemporaryFile = null;
|
||||
// loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.)
|
||||
while (reader.hasNext()) {
|
||||
int eventType = reader.getEventType();
|
||||
switch (eventType) {
|
||||
case XMLStreamConstants.START_ELEMENT:
|
||||
if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) {
|
||||
largeMessageTemporaryFile = processMessageBody(message.toCore());
|
||||
} else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) {
|
||||
processMessageProperties(message);
|
||||
} else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) {
|
||||
processMessageQueues(queues);
|
||||
}
|
||||
break;
|
||||
case XMLStreamConstants.END_ELEMENT:
|
||||
if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) {
|
||||
endLoop = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
if (endLoop) {
|
||||
break;
|
||||
}
|
||||
reader.next();
|
||||
}
|
||||
|
||||
XMLMessageImporter.MessageInfo info = messageReader.readMessage(false);
|
||||
if (sort) {
|
||||
messages.add(new MessageTemp(id, queues, message, largeMessageTemporaryFile));
|
||||
messages.add(info);
|
||||
} else {
|
||||
sendMessage(queues, message, largeMessageTemporaryFile);
|
||||
sendMessage(info.queues, info.message, info.tempFile);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class MessageTemp {
|
||||
long id;
|
||||
List<String> queues;
|
||||
Message message;
|
||||
File tempFileName;
|
||||
|
||||
MessageTemp(long id, List<String> queues, Message message, File tempFileName) {
|
||||
this.message = message;
|
||||
this.queues = queues;
|
||||
this.message = message;
|
||||
this.id = id;
|
||||
this.tempFileName = tempFileName;
|
||||
}
|
||||
}
|
||||
|
||||
private Byte getMessageType(String value) {
|
||||
Byte type = Message.DEFAULT_TYPE;
|
||||
switch (value) {
|
||||
case XmlDataConstants.DEFAULT_TYPE_PRETTY:
|
||||
type = Message.DEFAULT_TYPE;
|
||||
break;
|
||||
case XmlDataConstants.BYTES_TYPE_PRETTY:
|
||||
type = Message.BYTES_TYPE;
|
||||
break;
|
||||
case XmlDataConstants.MAP_TYPE_PRETTY:
|
||||
type = Message.MAP_TYPE;
|
||||
break;
|
||||
case XmlDataConstants.OBJECT_TYPE_PRETTY:
|
||||
type = Message.OBJECT_TYPE;
|
||||
break;
|
||||
case XmlDataConstants.STREAM_TYPE_PRETTY:
|
||||
type = Message.STREAM_TYPE;
|
||||
break;
|
||||
case XmlDataConstants.TEXT_TYPE_PRETTY:
|
||||
type = Message.TEXT_TYPE;
|
||||
break;
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
private void sendMessage(List<String> queues, Message message, File tempFileName) throws Exception {
|
||||
StringBuilder logMessage = new StringBuilder();
|
||||
String destination = addressMap.get(queues.get(0));
|
||||
|
@ -460,153 +351,6 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
}
|
||||
}
|
||||
|
||||
private void processMessageQueues(ArrayList<String> queues) {
|
||||
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
||||
if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i))) {
|
||||
String queueName = reader.getAttributeValue(i);
|
||||
String translation = checkPrefix(queueName);
|
||||
queues.add(translation);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String checkPrefix(String queueName) {
|
||||
String newQueueName = oldPrefixTranslation.get(queueName);
|
||||
if (newQueueName == null) {
|
||||
newQueueName = queueName;
|
||||
}
|
||||
return newQueueName;
|
||||
}
|
||||
|
||||
private void processMessageProperties(Message message) {
|
||||
String key = "";
|
||||
String value = "";
|
||||
String propertyType = "";
|
||||
|
||||
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
||||
String attributeName = reader.getAttributeLocalName(i);
|
||||
switch (attributeName) {
|
||||
case XmlDataConstants.PROPERTY_NAME:
|
||||
key = reader.getAttributeValue(i);
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_VALUE:
|
||||
value = reader.getAttributeValue(i);
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE:
|
||||
propertyType = reader.getAttributeValue(i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (value.equals(XmlDataConstants.NULL)) {
|
||||
value = null;
|
||||
}
|
||||
|
||||
switch (propertyType) {
|
||||
case XmlDataConstants.PROPERTY_TYPE_SHORT:
|
||||
message.putShortProperty(key, Short.parseShort(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_BOOLEAN:
|
||||
message.putBooleanProperty(key, Boolean.parseBoolean(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_BYTE:
|
||||
message.putByteProperty(key, Byte.parseByte(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_BYTES:
|
||||
message.putBytesProperty(key, value == null ? null : decode(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_DOUBLE:
|
||||
message.putDoubleProperty(key, Double.parseDouble(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_FLOAT:
|
||||
message.putFloatProperty(key, Float.parseFloat(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_INTEGER:
|
||||
message.putIntProperty(key, Integer.parseInt(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_LONG:
|
||||
message.putLongProperty(key, Long.parseLong(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING:
|
||||
message.putStringProperty(new SimpleString(key), value == null ? null : SimpleString.toSimpleString(value));
|
||||
break;
|
||||
case XmlDataConstants.PROPERTY_TYPE_STRING:
|
||||
message.putStringProperty(key, value);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private File processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException {
|
||||
File tempFileName = null;
|
||||
boolean isLarge = false;
|
||||
|
||||
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
||||
String attributeName = reader.getAttributeLocalName(i);
|
||||
if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName)) {
|
||||
isLarge = Boolean.parseBoolean(reader.getAttributeValue(i));
|
||||
}
|
||||
}
|
||||
reader.next();
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("XMLStreamReader impl: " + reader);
|
||||
}
|
||||
if (isLarge) {
|
||||
tempFileName = File.createTempFile("largeMessage", ".tmp");
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Creating temp file " + tempFileName + " for large message.");
|
||||
}
|
||||
try (OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFileName))) {
|
||||
getMessageBodyBytes(new MessageBodyBytesProcessor() {
|
||||
@Override
|
||||
public void processBodyBytes(byte[] bytes) throws IOException {
|
||||
out.write(bytes);
|
||||
}
|
||||
});
|
||||
}
|
||||
FileInputStream fileInputStream = new FileInputStream(tempFileName);
|
||||
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
|
||||
((ClientMessage) message).setBodyInputStream(bufferedInput);
|
||||
} else {
|
||||
getMessageBodyBytes(new MessageBodyBytesProcessor() {
|
||||
@Override
|
||||
public void processBodyBytes(byte[] bytes) throws IOException {
|
||||
message.getBodyBuffer().writeBytes(bytes);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return tempFileName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Message bodies are written to XML as one or more Base64 encoded CDATA elements. Some parser implementations won't
|
||||
* read an entire CDATA element at once (e.g. Woodstox) so it's possible that multiple CDATA/CHARACTERS events need
|
||||
* to be combined to reconstruct the Base64 encoded string. You can't decode bits and pieces of each CDATA. Each
|
||||
* CDATA has to be decoded in its entirety.
|
||||
*
|
||||
* @param processor used to deal with the decoded CDATA elements
|
||||
*/
|
||||
private void getMessageBodyBytes(MessageBodyBytesProcessor processor) throws IOException, XMLStreamException {
|
||||
int currentEventType;
|
||||
StringBuilder cdata = new StringBuilder();
|
||||
while (reader.hasNext()) {
|
||||
currentEventType = reader.getEventType();
|
||||
if (currentEventType == XMLStreamConstants.END_ELEMENT) {
|
||||
break;
|
||||
} else if (currentEventType == XMLStreamConstants.CHARACTERS && reader.isWhiteSpace() && cdata.length() > 0) {
|
||||
/* when we hit a whitespace CHARACTERS event we know that the entire CDATA is complete so decode, pass back to
|
||||
* the processor, and reset the cdata for the next event(s)
|
||||
*/
|
||||
processor.processBodyBytes(decode(cdata.toString()));
|
||||
cdata.setLength(0);
|
||||
} else {
|
||||
cdata.append(new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()).trim());
|
||||
}
|
||||
reader.next();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void oldBinding() throws Exception {
|
||||
String queueName = "";
|
||||
String address = "";
|
||||
|
@ -762,11 +506,5 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
}
|
||||
}
|
||||
|
||||
private static byte[] decode(String data) {
|
||||
return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
|
||||
}
|
||||
|
||||
private interface MessageBodyBytesProcessor {
|
||||
void processBodyBytes(byte[] bytes) throws IOException;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue