diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java index 8a355d14bb..f73a1b1d5e 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java @@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.jboss.logging.Logger; /** * Read XML output from org.apache.activemq.artemis.core.persistence.impl.journal.XmlDataExporter, create a core session, and @@ -65,6 +66,8 @@ import org.apache.activemq.artemis.utils.UUIDGenerator; public final class XmlDataImporter extends ActionAbstract { // Constants ----------------------------------------------------- + private static final Logger logger = Logger.getLogger(XmlDataImporter.class); + // Attributes ---------------------------------------------------- private XMLStreamReader reader; @@ -192,7 +195,9 @@ public final class XmlDataImporter extends ActionAbstract { private void processXml() throws Exception { try { while (reader.hasNext()) { - ActiveMQServerLogger.LOGGER.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] "); + if (logger.isDebugEnabled()) { + logger.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] "); + } if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) { if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName())) { bindQueue(); @@ -334,12 +339,16 @@ public final class XmlDataImporter extends ActionAbstract { ClientMessage managementMessage = managementSession.createMessage(false); ManagementHelper.putAttribute(managementMessage, "core.queue." + queue, "ID"); managementSession.start(); - ActiveMQServerLogger.LOGGER.debug("Requesting ID for: " + queue); + if (logger.isDebugEnabled()) { + logger.debug("Requesting ID for: " + queue); + } ClientMessage reply = requestor.request(managementMessage); Number idObject = (Number) ManagementHelper.getResult(reply); queueID = idObject.longValue(); requestor.close(); - ActiveMQServerLogger.LOGGER.debug("ID for " + queue + " is: " + queueID); + if (logger.isDebugEnabled()) { + logger.debug("ID for " + queue + " is: " + queueID); + } queueIDs.put(queue, queueID); // store it so we don't have to look it up every time } @@ -348,7 +357,9 @@ public final class XmlDataImporter extends ActionAbstract { } logMessage.delete(logMessage.length() - 2, logMessage.length()); // take off the trailing comma - ActiveMQServerLogger.LOGGER.debug(logMessage); + if (logger.isDebugEnabled()) { + logger.debug(logMessage); + } message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array()); try (ClientProducer producer = session.createProducer(destination)) { @@ -434,7 +445,7 @@ public final class XmlDataImporter extends ActionAbstract { } } - private void processMessageBody(Message message) throws XMLStreamException, IOException { + private void processMessageBody(final Message message) throws XMLStreamException, IOException { boolean isLarge = false; for (int i = 0; i < reader.getAttributeCount(); i++) { @@ -444,33 +455,65 @@ public final class XmlDataImporter extends ActionAbstract { } } reader.next(); + if (logger.isDebugEnabled()) { + logger.debug("XMLStreamReader impl: " + reader); + } if (isLarge) { tempFileName = UUID.randomUUID().toString() + ".tmp"; - ActiveMQServerLogger.LOGGER.debug("Creating temp file " + tempFileName + " for large message."); + if (logger.isDebugEnabled()) { + logger.debug("Creating temp file " + tempFileName + " for large message."); + } try (OutputStream out = new FileOutputStream(tempFileName)) { - while (reader.hasNext()) { - if (reader.getEventType() == XMLStreamConstants.END_ELEMENT) { - break; + getMessageBodyBytes(new MessageBodyBytesProcessor() { + @Override + public void processBodyBytes(byte[] bytes) throws IOException { + out.write(bytes); } - else { - String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()); - String trimmedCharacters = characters.trim(); - if (trimmedCharacters.length() > 0) { // this will skip "indentation" characters - byte[] data = decode(trimmedCharacters); - out.write(data); - } - } - reader.next(); - } + }); } FileInputStream fileInputStream = new FileInputStream(tempFileName); BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream); ((ClientMessage) message).setBodyInputStream(bufferedInput); } else { - reader.next(); // step past the "indentation" characters to get to the CDATA with the message body - String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()); - message.getBodyBuffer().writeBytes(decode(characters.trim())); + getMessageBodyBytes(new MessageBodyBytesProcessor() { + @Override + public void processBodyBytes(byte[] bytes) throws IOException { + message.getBodyBuffer().writeBytes(bytes); + } + }); + } + } + + /** + * Message bodies are written to XML as one or more Base64 encoded CDATA elements. Some parser implementations won't + * read an entire CDATA element at once (e.g. Woodstox) so it's possible that multiple CDATA/CHARACTERS events need + * to be combined to reconstruct the Base64 encoded string. You can't decode bits and pieces of each CDATA. Each + * CDATA has to be decoded in its entirety. + * + * @param processor used to deal with the decoded CDATA elements + * @throws IOException + * @throws XMLStreamException + */ + private void getMessageBodyBytes(MessageBodyBytesProcessor processor) throws IOException, XMLStreamException { + int currentEventType; + StringBuilder cdata = new StringBuilder(); + while (reader.hasNext()) { + currentEventType = reader.getEventType(); + if (currentEventType == XMLStreamConstants.END_ELEMENT) { + break; + } + /* when we hit a whitespace CHARACTERS event we know that the entire CDATA is complete so decode, pass back to + * the processor, and reset the cdata for the next event(s) + */ + else if (currentEventType == XMLStreamConstants.CHARACTERS && reader.isWhiteSpace() && cdata.length() > 0) { + processor.processBodyBytes(decode(cdata.toString())); + cdata.setLength(0); + } + else { + cdata.append(new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()).trim()); + } + reader.next(); } } @@ -498,10 +541,14 @@ public final class XmlDataImporter extends ActionAbstract { if (!queueQuery.isExists()) { session.createQueue(address, queueName, filter, true); - ActiveMQServerLogger.LOGGER.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")"); + if (logger.isDebugEnabled()) { + logger.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")"); + } } else { - ActiveMQServerLogger.LOGGER.debug("Binding " + queueName + " already exists so won't re-bind."); + if (logger.isDebugEnabled()) { + logger.debug("Binding " + queueName + " already exists so won't re-bind."); + } } addressMap.put(queueName, address); @@ -601,123 +648,183 @@ public final class XmlDataImporter extends ActionAbstract { case XMLStreamConstants.START_ELEMENT: if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_FAILOVER_TIMEOUT.equals(reader.getLocalName())) { callFailoverTimeout = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory callFailoverTimeout: " + callFailoverTimeout); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory callFailoverTimeout: " + callFailoverTimeout); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_TIMEOUT.equals(reader.getLocalName())) { callTimeout = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory callTimeout: " + callTimeout); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory callTimeout: " + callTimeout); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_FAILURE_CHECK_PERIOD.equals(reader.getLocalName())) { clientFailureCheckPeriod = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory clientFailureCheckPeriod: " + clientFailureCheckPeriod); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory clientFailureCheckPeriod: " + clientFailureCheckPeriod); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_ID.equals(reader.getLocalName())) { clientId = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory clientId: " + clientId); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory clientId: " + clientId); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONFIRMATION_WINDOW_SIZE.equals(reader.getLocalName())) { confirmationWindowSize = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory confirmationWindowSize: " + confirmationWindowSize); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory confirmationWindowSize: " + confirmationWindowSize); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTION_TTL.equals(reader.getLocalName())) { connectionTtl = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory connectionTtl: " + connectionTtl); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory connectionTtl: " + connectionTtl); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTOR.equals(reader.getLocalName())) { connectors = getConnectors(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory getLocalName: " + connectors); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory getLocalName: " + connectors); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_MAX_RATE.equals(reader.getLocalName())) { consumerMaxRate = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory consumerMaxRate: " + consumerMaxRate); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory consumerMaxRate: " + consumerMaxRate); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_WINDOW_SIZE.equals(reader.getLocalName())) { consumerWindowSize = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory consumerWindowSize: " + consumerWindowSize); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory consumerWindowSize: " + consumerWindowSize); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DISCOVERY_GROUP_NAME.equals(reader.getLocalName())) { discoveryGroupName = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory discoveryGroupName: " + discoveryGroupName); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory discoveryGroupName: " + discoveryGroupName); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DUPS_OK_BATCH_SIZE.equals(reader.getLocalName())) { dupsOkBatchSize = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory dupsOkBatchSize: " + dupsOkBatchSize); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory dupsOkBatchSize: " + dupsOkBatchSize); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_GROUP_ID.equals(reader.getLocalName())) { groupId = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory groupId: " + groupId); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory groupId: " + groupId); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_LOAD_BALANCING_POLICY_CLASS_NAME.equals(reader.getLocalName())) { loadBalancingPolicyClassName = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory loadBalancingPolicyClassName: " + loadBalancingPolicyClassName); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory loadBalancingPolicyClassName: " + loadBalancingPolicyClassName); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MAX_RETRY_INTERVAL.equals(reader.getLocalName())) { maxRetryInterval = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory maxRetryInterval: " + maxRetryInterval); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory maxRetryInterval: " + maxRetryInterval); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MIN_LARGE_MESSAGE_SIZE.equals(reader.getLocalName())) { minLargeMessageSize = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory minLargeMessageSize: " + minLargeMessageSize); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory minLargeMessageSize: " + minLargeMessageSize); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_NAME.equals(reader.getLocalName())) { name = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory name: " + name); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory name: " + name); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_MAX_RATE.equals(reader.getLocalName())) { producerMaxRate = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory producerMaxRate: " + producerMaxRate); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory producerMaxRate: " + producerMaxRate); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_WINDOW_SIZE.equals(reader.getLocalName())) { producerWindowSize = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory producerWindowSize: " + producerWindowSize); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory producerWindowSize: " + producerWindowSize); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RECONNECT_ATTEMPTS.equals(reader.getLocalName())) { reconnectAttempts = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory reconnectAttempts: " + reconnectAttempts); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory reconnectAttempts: " + reconnectAttempts); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL.equals(reader.getLocalName())) { retryInterval = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory retryInterval: " + retryInterval); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory retryInterval: " + retryInterval); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL_MULTIPLIER.equals(reader.getLocalName())) { retryIntervalMultiplier = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory retryIntervalMultiplier: " + retryIntervalMultiplier); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory retryIntervalMultiplier: " + retryIntervalMultiplier); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_SCHEDULED_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName())) { scheduledThreadMaxPoolSize = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory scheduledThreadMaxPoolSize: " + scheduledThreadMaxPoolSize); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory scheduledThreadMaxPoolSize: " + scheduledThreadMaxPoolSize); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName())) { threadMaxPoolSize = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory threadMaxPoolSize: " + threadMaxPoolSize); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory threadMaxPoolSize: " + threadMaxPoolSize); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TRANSACTION_BATCH_SIZE.equals(reader.getLocalName())) { transactionBatchSize = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory transactionBatchSize: " + transactionBatchSize); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory transactionBatchSize: " + transactionBatchSize); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TYPE.equals(reader.getLocalName())) { type = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory type: " + type); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory type: " + type); + } } else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) { entries = getEntries(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory entries: " + entries); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory entries: " + entries); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_AUTO_GROUP.equals(reader.getLocalName())) { autoGroup = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory autoGroup: " + autoGroup); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory autoGroup: " + autoGroup); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_ACKNOWLEDGE.equals(reader.getLocalName())) { blockOnAcknowledge = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory blockOnAcknowledge: " + blockOnAcknowledge); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory blockOnAcknowledge: " + blockOnAcknowledge); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_DURABLE_SEND.equals(reader.getLocalName())) { blockOnDurableSend = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory blockOnDurableSend: " + blockOnDurableSend); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory blockOnDurableSend: " + blockOnDurableSend); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_NON_DURABLE_SEND.equals(reader.getLocalName())) { blockOnNonDurableSend = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory blockOnNonDurableSend: " + blockOnNonDurableSend); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory blockOnNonDurableSend: " + blockOnNonDurableSend); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CACHE_LARGE_MESSAGES_CLIENT.equals(reader.getLocalName())) { cacheLargeMessagesClient = reader.getElementText(); @@ -725,23 +832,33 @@ public final class XmlDataImporter extends ActionAbstract { } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_COMPRESS_LARGE_MESSAGES.equals(reader.getLocalName())) { compressLargeMessages = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory compressLargeMessages: " + compressLargeMessages); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory compressLargeMessages: " + compressLargeMessages); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_FAILOVER_ON_INITIAL_CONNECTION.equals(reader.getLocalName())) { failoverOnInitialConnection = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory failoverOnInitialConnection: " + failoverOnInitialConnection); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory failoverOnInitialConnection: " + failoverOnInitialConnection); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_HA.equals(reader.getLocalName())) { ha = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory ha: " + ha); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory ha: " + ha); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PREACKNOWLEDGE.equals(reader.getLocalName())) { preacknowledge = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory preacknowledge: " + preacknowledge); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory preacknowledge: " + preacknowledge); + } } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_USE_GLOBAL_POOLS.equals(reader.getLocalName())) { useGlobalPools = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS connection factory useGlobalPools: " + useGlobalPools); + if (logger.isDebugEnabled()) { + logger.debug("JMS connection factory useGlobalPools: " + useGlobalPools); + } } break; case XMLStreamConstants.END_ELEMENT: @@ -763,7 +880,9 @@ public final class XmlDataImporter extends ActionAbstract { managementSession.start(); ClientMessage reply = requestor.request(managementMessage); if (ManagementHelper.hasOperationSucceeded(reply)) { - ActiveMQServerLogger.LOGGER.debug("Created connection factory " + name); + if (logger.isDebugEnabled()) { + logger.debug("Created connection factory " + name); + } } else { ActiveMQServerLogger.LOGGER.error("Problem creating " + name); @@ -785,15 +904,21 @@ public final class XmlDataImporter extends ActionAbstract { case XMLStreamConstants.START_ELEMENT: if (XmlDataConstants.JMS_DESTINATION_NAME.equals(reader.getLocalName())) { name = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS destination name: " + name); + if (logger.isDebugEnabled()) { + logger.debug("JMS destination name: " + name); + } } else if (XmlDataConstants.JMS_DESTINATION_SELECTOR.equals(reader.getLocalName())) { selector = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS destination selector: " + selector); + if (logger.isDebugEnabled()) { + logger.debug("JMS destination selector: " + selector); + } } else if (XmlDataConstants.JMS_DESTINATION_TYPE.equals(reader.getLocalName())) { type = reader.getElementText(); - ActiveMQServerLogger.LOGGER.debug("JMS destination type: " + type); + if (logger.isDebugEnabled()) { + logger.debug("JMS destination type: " + type); + } } else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) { entries = getEntries(); @@ -822,7 +947,9 @@ public final class XmlDataImporter extends ActionAbstract { managementSession.start(); ClientMessage reply = requestor.request(managementMessage); if (ManagementHelper.hasOperationSucceeded(reply)) { - ActiveMQServerLogger.LOGGER.debug("Created " + type.toLowerCase() + " " + name); + if (logger.isDebugEnabled()) { + logger.debug("Created " + type.toLowerCase() + " " + name); + } } else { ActiveMQServerLogger.LOGGER.error("Problem creating " + name); @@ -842,7 +969,9 @@ public final class XmlDataImporter extends ActionAbstract { if (XmlDataConstants.JMS_JNDI_ENTRY.equals(reader.getLocalName())) { String elementText = reader.getElementText(); entry.append(elementText).append(", "); - ActiveMQServerLogger.LOGGER.debug("JMS admin object JNDI entry: " + entry.toString()); + if (logger.isDebugEnabled()) { + logger.debug("JMS admin object JNDI entry: " + entry.toString()); + } } break; case XMLStreamConstants.END_ELEMENT: @@ -897,6 +1026,11 @@ public final class XmlDataImporter extends ActionAbstract { return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); } + private interface MessageBodyBytesProcessor { + + void processBodyBytes(byte[] bytes) throws IOException; + } + // Inner classes ------------------------------------------------- } diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index c6f9834c27..4f56ca8747 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -376,6 +376,18 @@ compile + + org.codehaus.woodstox + woodstox-core-asl + 4.4.0 + test + + + javax.xml.stream + stax-api + + + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java index 662fef75fe..bdd9255182 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java @@ -18,15 +18,18 @@ package org.apache.activemq.artemis.tests.integration.persistence; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.TextMessage; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.UUID; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; @@ -38,6 +41,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.JMSFactoryType; import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporter; import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter; @@ -52,6 +56,7 @@ import org.apache.activemq.artemis.jms.server.JMSServerManager; import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; import org.apache.activemq.artemis.tests.unit.util.InVMContext; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.junit.Test; import org.junit.runner.RunWith; @@ -259,6 +264,94 @@ public class XmlImportExportTest extends ActiveMQTestBase { assertEquals(Message.DEFAULT_TYPE, msg.getType()); } + @Test + public void testTextMessage() throws Exception { + StringBuilder data = new StringBuilder(); + for (int i = 0; i < 2608; i++) { + data.append("X"); + } + + ClientSession session = basicSetUp(); + + session.createQueue(QUEUE_NAME, QUEUE_NAME, true); + + ClientProducer producer = session.createProducer(QUEUE_NAME); + ClientMessage msg = session.createMessage(Message.TEXT_TYPE, true); + msg.getBodyBuffer().writeString(data.toString()); + producer.send(msg); + + session.close(); + locator.close(); + server.stop(); + + ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream(); + XmlDataExporter xmlDataExporter = new XmlDataExporter(); + xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory()); + System.out.print(new String(xmlOutputStream.toByteArray())); + + clearDataRecreateServerDirs(); + server.start(); + checkForLongs(); + locator = createInVMNonHALocator(); + factory = createSessionFactory(locator); + session = factory.createSession(false, true, true); + + ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); + XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.process(xmlInputStream, session); + ClientConsumer consumer = session.createConsumer(QUEUE_NAME); + session.start(); + + msg = consumer.receive(CONSUMER_TIMEOUT); + assertEquals(Message.TEXT_TYPE, msg.getType()); + assertEquals(data.toString(), msg.getBodyBuffer().readString()); + } + + @Test + public void testBytesMessage() throws Exception { + StringBuilder data = new StringBuilder(); + for (int i = 0; i < 2610; i++) { + data.append("X"); + } + + ClientSession session = basicSetUp(); + + session.createQueue(QUEUE_NAME, QUEUE_NAME, true); + + ClientProducer producer = session.createProducer(QUEUE_NAME); + ClientMessage msg = session.createMessage(Message.BYTES_TYPE, true); + msg.getBodyBuffer().writeBytes(data.toString().getBytes()); + producer.send(msg); + + session.close(); + locator.close(); + server.stop(); + + ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream(); + XmlDataExporter xmlDataExporter = new XmlDataExporter(); + xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory()); + System.out.print(new String(xmlOutputStream.toByteArray())); + + clearDataRecreateServerDirs(); + server.start(); + checkForLongs(); + locator = createInVMNonHALocator(); + factory = createSessionFactory(locator); + session = factory.createSession(false, true, true); + + ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); + XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.process(xmlInputStream, session); + ClientConsumer consumer = session.createConsumer(QUEUE_NAME); + session.start(); + + msg = consumer.receive(CONSUMER_TIMEOUT); + assertEquals(Message.BYTES_TYPE, msg.getType()); + byte[] result = new byte[msg.getBodySize()]; + msg.getBodyBuffer().readBytes(result); + assertEquals(data.toString().getBytes().length, result.length); + } + @Test public void testMessageAttributes() throws Exception { @@ -570,6 +663,54 @@ public class XmlImportExportTest extends ActiveMQTestBase { session.commit(); } + @Test + public void testLargeJmsTextMessage() throws Exception { + basicSetUp(); + ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory("vm://0", "test"); + Connection c = cf.createConnection(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer p = s.createProducer(ActiveMQJMSClient.createQueue("A")); + p.setDeliveryMode(DeliveryMode.PERSISTENT); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 1024 * 200; i++) { + stringBuilder.append(RandomUtil.randomChar()); + } + TextMessage textMessage = s.createTextMessage(stringBuilder.toString()); + textMessage.setStringProperty("_AMQ_DUPL_ID", String.valueOf(UUID.randomUUID())); + p.send(textMessage); + c.close(); + + locator.close(); + server.stop(); + + ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream(); + XmlDataExporter xmlDataExporter = new XmlDataExporter(); + xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory()); + System.out.print(new String(xmlOutputStream.toByteArray())); + + clearDataRecreateServerDirs(); + server.start(); + checkForLongs(); + locator = createInVMNonHALocator(); + factory = createSessionFactory(locator); + ClientSession session = factory.createSession(false, true, true); + + ByteArrayInputStream inputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); + XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.process(inputStream, session); + session.close(); + + c = cf.createConnection(); + s = c.createSession(); + MessageConsumer mc = s.createConsumer(ActiveMQJMSClient.createQueue("A")); + c.start(); + javax.jms.Message msg = mc.receive(CONSUMER_TIMEOUT); + + assertNotNull(msg); + + c.close(); + } + @Test public void testPartialQueue() throws Exception { ClientSession session = basicSetUp();