From 15d693dd4edcc2e731e088c2eb6a496f088afaa3 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Thu, 2 Feb 2017 12:09:39 -0600 Subject: [PATCH] ARTEMIS-943 update/doc XML import/export --- .../cli/commands/tools/XmlDataConstants.java | 20 +- .../cli/commands/tools/XmlDataExporter.java | 338 ++------------ .../cli/commands/tools/XmlDataImporter.java | 437 ++++-------------- .../schema/artemis-import-export.xsd | 248 ++++++++++ .../activemq/artemis/utils/ListUtil.java | 35 ++ .../impl/ActiveMQServerControlImpl.java | 19 +- .../core/server/impl/FileLockNodeManager.java | 3 + .../persistence/XmlImportExportTest.java | 117 ++++- 8 files changed, 508 insertions(+), 709 deletions(-) create mode 100644 artemis-cli/src/main/resources/schema/artemis-import-export.xsd create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ListUtil.java diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java index 6d808c898b..b4ebe49a2c 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java @@ -29,13 +29,19 @@ public final class XmlDataConstants { static final String XML_VERSION = "1.0"; static final String DOCUMENT_PARENT = "activemq-journal"; static final String BINDINGS_PARENT = "bindings"; - static final String BINDINGS_CHILD = "binding"; - static final String BINDING_ADDRESS = "address"; - static final String BINDING_FILTER_STRING = "filter-string"; - static final String BINDING_QUEUE_NAME = "queue-name"; - static final String BINDING_ID = "id"; - static final String JMS_CONNECTION_FACTORY = "jms-connection-factory"; - static final String JMS_CONNECTION_FACTORIES = "jms-connection-factories"; + + static final String QUEUE_BINDINGS_CHILD = "queue-binding"; + static final String QUEUE_BINDING_ADDRESS = "address"; + static final String QUEUE_BINDING_FILTER_STRING = "filter-string"; + static final String QUEUE_BINDING_NAME = "name"; + static final String QUEUE_BINDING_ID = "id"; + static final String QUEUE_BINDING_ROUTING_TYPE = "routing-type"; + + static final String ADDRESS_BINDINGS_CHILD = "address-binding"; + static final String ADDRESS_BINDING_NAME = "name"; + static final String ADDRESS_BINDING_ID = "id"; + static final String ADDRESS_BINDING_ROUTING_TYPE = "routing-types"; + static final String MESSAGES_PARENT = "messages"; static final String MESSAGES_CHILD = "message"; static final String MESSAGE_ID = "id"; diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java index 8030ce2f5a..1858308467 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java @@ -31,7 +31,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -42,14 +41,10 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.jms.JMSFactoryType; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; -import org.apache.activemq.artemis.core.io.SequentialFileFactory; -import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; @@ -74,19 +69,16 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentAddressBindingEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; -import org.apache.activemq.artemis.jms.persistence.config.PersistedBindings; -import org.apache.activemq.artemis.jms.persistence.config.PersistedConnectionFactory; -import org.apache.activemq.artemis.jms.persistence.config.PersistedDestination; -import org.apache.activemq.artemis.jms.persistence.config.PersistedType; -import org.apache.activemq.artemis.jms.persistence.impl.journal.JMSJournalStorageManagerImpl; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.ExecutorFactory; @@ -115,11 +107,7 @@ public final class XmlDataExporter extends OptionalLocking { private final HashMap queueBindings = new HashMap<>(); - private final Map jmsConnectionFactories = new ConcurrentHashMap<>(); - - private final Map, PersistedDestination> jmsDestinations = new ConcurrentHashMap<>(); - - private final Map, PersistedBindings> jmsJNDI = new ConcurrentHashMap<>(); + private final HashMap addressBindings = new HashMap<>(); long messagesPrinted = 0L; @@ -161,7 +149,6 @@ public final class XmlDataExporter extends OptionalLocking { private void writeXMLData() throws Exception { long start = System.currentTimeMillis(); getBindings(); - getJmsBindings(); processMessageJournal(); printDataAsXML(); ActiveMQServerLogger.LOGGER.debug("\n\nProcessing took: " + (System.currentTimeMillis() - start) + "ms"); @@ -298,58 +285,6 @@ public final class XmlDataExporter extends OptionalLocking { } } - private void getJmsBindings() throws Exception { - SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1); - - Journal jmsJournal = new JournalImpl(1024 * 1024, 2, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1); - - jmsJournal.start(); - - List data = new ArrayList<>(); - - ArrayList list = new ArrayList<>(); - - ActiveMQServerLogger.LOGGER.debug("Reading jms bindings journal from " + config.getBindingsDirectory()); - - jmsJournal.load(data, list, null); - - for (RecordInfo record : data) { - long id = record.id; - - ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(record.data); - - byte rec = record.getUserRecordType(); - - if (rec == JMSJournalStorageManagerImpl.CF_RECORD) { - PersistedConnectionFactory cf = new PersistedConnectionFactory(); - cf.decode(buffer); - cf.setId(id); - ActiveMQServerLogger.LOGGER.info("Found JMS connection factory: " + cf.getName()); - jmsConnectionFactories.put(cf.getName(), cf); - } else if (rec == JMSJournalStorageManagerImpl.DESTINATION_RECORD) { - PersistedDestination destination = new PersistedDestination(); - destination.decode(buffer); - destination.setId(id); - ActiveMQServerLogger.LOGGER.info("Found JMS destination: " + destination.getName()); - jmsDestinations.put(new Pair<>(destination.getType(), destination.getName()), destination); - } else if (rec == JMSJournalStorageManagerImpl.BINDING_RECORD) { - PersistedBindings jndi = new PersistedBindings(); - jndi.decode(buffer); - jndi.setId(id); - Pair key = new Pair<>(jndi.getType(), jndi.getName()); - StringBuilder builder = new StringBuilder(); - for (String binding : jndi.getBindings()) { - builder.append(binding).append(" "); - } - ActiveMQServerLogger.LOGGER.info("Found JMS JNDI binding data for " + jndi.getType() + " " + jndi.getName() + ": " + builder.toString()); - jmsJNDI.put(key, jndi); - } else { - throw new IllegalStateException("Invalid record type " + rec); - } - - } - } - /** * Open the bindings journal and extract all bindings data. * @@ -370,6 +305,9 @@ public final class XmlDataExporter extends OptionalLocking { if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD) { PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) DescribeJournal.newObjectEncoding(info, null); queueBindings.put(bindingEncoding.getId(), bindingEncoding); + } else if (info.getUserRecordType() == JournalRecordIds.ADDRESS_BINDING_RECORD) { + PersistentAddressBindingEncoding bindingEncoding = (PersistentAddressBindingEncoding) DescribeJournal.newObjectEncoding(info, null); + addressBindings.put(bindingEncoding.getId(), bindingEncoding); } } @@ -381,8 +319,6 @@ public final class XmlDataExporter extends OptionalLocking { xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION); xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT); printBindingsAsXML(); - printJmsConnectionFactoriesAsXML(); - printJmsDestinationsAsXML(); printAllMessagesAsXML(); xmlWriter.writeEndElement(); // end DOCUMENT_PARENT xmlWriter.writeEndDocument(); @@ -395,256 +331,35 @@ public final class XmlDataExporter extends OptionalLocking { private void printBindingsAsXML() throws XMLStreamException { xmlWriter.writeStartElement(XmlDataConstants.BINDINGS_PARENT); + for (Map.Entry addressBindingEncodingEntry : addressBindings.entrySet()) { + PersistentAddressBindingEncoding bindingEncoding = addressBindings.get(addressBindingEncodingEntry.getKey()); + xmlWriter.writeEmptyElement(XmlDataConstants.ADDRESS_BINDINGS_CHILD); + StringBuilder routingTypes = new StringBuilder(); + for (RoutingType routingType : bindingEncoding.getRoutingTypes()) { + routingTypes.append(routingType.toString()).append(", "); + } + xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_ROUTING_TYPE, routingTypes.toString().substring(0, routingTypes.length() - 2)); + xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_NAME, bindingEncoding.getName().toString()); + xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_ID, Long.toString(bindingEncoding.getId())); + bindingsPrinted++; + } for (Map.Entry queueBindingEncodingEntry : queueBindings.entrySet()) { PersistentQueueBindingEncoding bindingEncoding = queueBindings.get(queueBindingEncodingEntry.getKey()); - xmlWriter.writeEmptyElement(XmlDataConstants.BINDINGS_CHILD); - xmlWriter.writeAttribute(XmlDataConstants.BINDING_ADDRESS, bindingEncoding.getAddress().toString()); + xmlWriter.writeEmptyElement(XmlDataConstants.QUEUE_BINDINGS_CHILD); + xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ADDRESS, bindingEncoding.getAddress().toString()); String filter = ""; if (bindingEncoding.getFilterString() != null) { filter = bindingEncoding.getFilterString().toString(); } - xmlWriter.writeAttribute(XmlDataConstants.BINDING_FILTER_STRING, filter); - xmlWriter.writeAttribute(XmlDataConstants.BINDING_QUEUE_NAME, bindingEncoding.getQueueName().toString()); - xmlWriter.writeAttribute(XmlDataConstants.BINDING_ID, Long.toString(bindingEncoding.getId())); + xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_FILTER_STRING, filter); + xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_NAME, bindingEncoding.getQueueName().toString()); + xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ID, Long.toString(bindingEncoding.getId())); + xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ROUTING_TYPE, RoutingType.getType(bindingEncoding.getRoutingType()).toString()); bindingsPrinted++; } xmlWriter.writeEndElement(); // end BINDINGS_PARENT } - private void printJmsConnectionFactoriesAsXML() throws XMLStreamException { - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORIES); - for (String jmsConnectionFactoryKey : jmsConnectionFactories.keySet()) { - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY); - PersistedConnectionFactory jmsConnectionFactory = jmsConnectionFactories.get(jmsConnectionFactoryKey); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_NAME); - xmlWriter.writeCharacters(jmsConnectionFactory.getName()); - xmlWriter.writeEndElement(); - String clientID = jmsConnectionFactory.getConfig().getClientID(); - if (clientID != null) { - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_ID); - xmlWriter.writeCharacters(clientID); - xmlWriter.writeEndElement(); - } - - long callFailoverTimeout = jmsConnectionFactory.getConfig().getCallFailoverTimeout(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_FAILOVER_TIMEOUT); - xmlWriter.writeCharacters(Long.toString(callFailoverTimeout)); - xmlWriter.writeEndElement(); - - long callTimeout = jmsConnectionFactory.getConfig().getCallTimeout(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_TIMEOUT); - xmlWriter.writeCharacters(Long.toString(callTimeout)); - xmlWriter.writeEndElement(); - - long clientFailureCheckPeriod = jmsConnectionFactory.getConfig().getClientFailureCheckPeriod(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_FAILURE_CHECK_PERIOD); - xmlWriter.writeCharacters(Long.toString(clientFailureCheckPeriod)); - xmlWriter.writeEndElement(); - - int confirmationWindowSize = jmsConnectionFactory.getConfig().getConfirmationWindowSize(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONFIRMATION_WINDOW_SIZE); - xmlWriter.writeCharacters(Integer.toString(confirmationWindowSize)); - xmlWriter.writeEndElement(); - - long connectionTTL = jmsConnectionFactory.getConfig().getConnectionTTL(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTION_TTL); - xmlWriter.writeCharacters(Long.toString(connectionTTL)); - xmlWriter.writeEndElement(); - - long consumerMaxRate = jmsConnectionFactory.getConfig().getConsumerMaxRate(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_MAX_RATE); - xmlWriter.writeCharacters(Long.toString(consumerMaxRate)); - xmlWriter.writeEndElement(); - - long consumerWindowSize = jmsConnectionFactory.getConfig().getConsumerWindowSize(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_WINDOW_SIZE); - xmlWriter.writeCharacters(Long.toString(consumerWindowSize)); - xmlWriter.writeEndElement(); - - String discoveryGroupName = jmsConnectionFactory.getConfig().getDiscoveryGroupName(); - if (discoveryGroupName != null) { - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_DISCOVERY_GROUP_NAME); - xmlWriter.writeCharacters(discoveryGroupName); - xmlWriter.writeEndElement(); - } - - int dupsOKBatchSize = jmsConnectionFactory.getConfig().getDupsOKBatchSize(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_DUPS_OK_BATCH_SIZE); - xmlWriter.writeCharacters(Integer.toString(dupsOKBatchSize)); - xmlWriter.writeEndElement(); - - JMSFactoryType factoryType = jmsConnectionFactory.getConfig().getFactoryType(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_TYPE); - xmlWriter.writeCharacters(Integer.toString(factoryType.intValue())); - xmlWriter.writeEndElement(); - - String groupID = jmsConnectionFactory.getConfig().getGroupID(); - if (groupID != null) { - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_GROUP_ID); - xmlWriter.writeCharacters(groupID); - xmlWriter.writeEndElement(); - } - - String loadBalancingPolicyClassName = jmsConnectionFactory.getConfig().getLoadBalancingPolicyClassName(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_LOAD_BALANCING_POLICY_CLASS_NAME); - xmlWriter.writeCharacters(loadBalancingPolicyClassName); - xmlWriter.writeEndElement(); - - long maxRetryInterval = jmsConnectionFactory.getConfig().getMaxRetryInterval(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_MAX_RETRY_INTERVAL); - xmlWriter.writeCharacters(Long.toString(maxRetryInterval)); - xmlWriter.writeEndElement(); - - long minLargeMessageSize = jmsConnectionFactory.getConfig().getMinLargeMessageSize(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_MIN_LARGE_MESSAGE_SIZE); - xmlWriter.writeCharacters(Long.toString(minLargeMessageSize)); - xmlWriter.writeEndElement(); - - long producerMaxRate = jmsConnectionFactory.getConfig().getProducerMaxRate(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_MAX_RATE); - xmlWriter.writeCharacters(Long.toString(producerMaxRate)); - xmlWriter.writeEndElement(); - - long producerWindowSize = jmsConnectionFactory.getConfig().getProducerWindowSize(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_WINDOW_SIZE); - xmlWriter.writeCharacters(Long.toString(producerWindowSize)); - xmlWriter.writeEndElement(); - - long reconnectAttempts = jmsConnectionFactory.getConfig().getReconnectAttempts(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_RECONNECT_ATTEMPTS); - xmlWriter.writeCharacters(Long.toString(reconnectAttempts)); - xmlWriter.writeEndElement(); - - long retryInterval = jmsConnectionFactory.getConfig().getRetryInterval(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL); - xmlWriter.writeCharacters(Long.toString(retryInterval)); - xmlWriter.writeEndElement(); - - double retryIntervalMultiplier = jmsConnectionFactory.getConfig().getRetryIntervalMultiplier(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL_MULTIPLIER); - xmlWriter.writeCharacters(Double.toString(retryIntervalMultiplier)); - xmlWriter.writeEndElement(); - - long scheduledThreadPoolMaxSize = jmsConnectionFactory.getConfig().getScheduledThreadPoolMaxSize(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_SCHEDULED_THREAD_POOL_MAX_SIZE); - xmlWriter.writeCharacters(Long.toString(scheduledThreadPoolMaxSize)); - xmlWriter.writeEndElement(); - - long threadPoolMaxSize = jmsConnectionFactory.getConfig().getThreadPoolMaxSize(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_THREAD_POOL_MAX_SIZE); - xmlWriter.writeCharacters(Long.toString(threadPoolMaxSize)); - xmlWriter.writeEndElement(); - - long transactionBatchSize = jmsConnectionFactory.getConfig().getTransactionBatchSize(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_TRANSACTION_BATCH_SIZE); - xmlWriter.writeCharacters(Long.toString(transactionBatchSize)); - xmlWriter.writeEndElement(); - - boolean autoGroup = jmsConnectionFactory.getConfig().isAutoGroup(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_AUTO_GROUP); - xmlWriter.writeCharacters(Boolean.toString(autoGroup)); - xmlWriter.writeEndElement(); - - boolean blockOnAcknowledge = jmsConnectionFactory.getConfig().isBlockOnAcknowledge(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_ACKNOWLEDGE); - xmlWriter.writeCharacters(Boolean.toString(blockOnAcknowledge)); - xmlWriter.writeEndElement(); - - boolean blockOnDurableSend = jmsConnectionFactory.getConfig().isBlockOnDurableSend(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_DURABLE_SEND); - xmlWriter.writeCharacters(Boolean.toString(blockOnDurableSend)); - xmlWriter.writeEndElement(); - - boolean blockOnNonDurableSend = jmsConnectionFactory.getConfig().isBlockOnNonDurableSend(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_NON_DURABLE_SEND); - xmlWriter.writeCharacters(Boolean.toString(blockOnNonDurableSend)); - xmlWriter.writeEndElement(); - - boolean cacheLargeMessagesClient = jmsConnectionFactory.getConfig().isCacheLargeMessagesClient(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CACHE_LARGE_MESSAGES_CLIENT); - xmlWriter.writeCharacters(Boolean.toString(cacheLargeMessagesClient)); - xmlWriter.writeEndElement(); - - boolean compressLargeMessages = jmsConnectionFactory.getConfig().isCompressLargeMessages(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_COMPRESS_LARGE_MESSAGES); - xmlWriter.writeCharacters(Boolean.toString(compressLargeMessages)); - xmlWriter.writeEndElement(); - - boolean failoverOnInitialConnection = jmsConnectionFactory.getConfig().isFailoverOnInitialConnection(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_FAILOVER_ON_INITIAL_CONNECTION); - xmlWriter.writeCharacters(Boolean.toString(failoverOnInitialConnection)); - xmlWriter.writeEndElement(); - - boolean ha = jmsConnectionFactory.getConfig().isHA(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_HA); - xmlWriter.writeCharacters(Boolean.toString(ha)); - xmlWriter.writeEndElement(); - - boolean preAcknowledge = jmsConnectionFactory.getConfig().isPreAcknowledge(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_PREACKNOWLEDGE); - xmlWriter.writeCharacters(Boolean.toString(preAcknowledge)); - xmlWriter.writeEndElement(); - - boolean useGlobalPools = jmsConnectionFactory.getConfig().isUseGlobalPools(); - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_USE_GLOBAL_POOLS); - xmlWriter.writeCharacters(Boolean.toString(useGlobalPools)); - xmlWriter.writeEndElement(); - - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTORS); - for (String connector : jmsConnectionFactory.getConfig().getConnectorNames()) { - xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTOR); - xmlWriter.writeCharacters(connector); - xmlWriter.writeEndElement(); - } - xmlWriter.writeEndElement(); - - xmlWriter.writeStartElement(XmlDataConstants.JMS_JNDI_ENTRIES); - PersistedBindings jndi = jmsJNDI.get(new Pair<>(PersistedType.ConnectionFactory, jmsConnectionFactory.getName())); - for (String jndiEntry : jndi.getBindings()) { - xmlWriter.writeStartElement(XmlDataConstants.JMS_JNDI_ENTRY); - xmlWriter.writeCharacters(jndiEntry); - xmlWriter.writeEndElement(); - } - xmlWriter.writeEndElement(); // end jndi-entries - xmlWriter.writeEndElement(); // end JMS_CONNECTION_FACTORY - } - xmlWriter.writeEndElement(); - } - - private void printJmsDestinationsAsXML() throws XMLStreamException { - xmlWriter.writeStartElement(XmlDataConstants.JMS_DESTINATIONS); - for (Pair jmsDestinationsKey : jmsDestinations.keySet()) { - PersistedDestination jmsDestination = jmsDestinations.get(jmsDestinationsKey); - xmlWriter.writeStartElement(XmlDataConstants.JMS_DESTINATION); - - xmlWriter.writeStartElement(XmlDataConstants.JMS_DESTINATION_NAME); - xmlWriter.writeCharacters(jmsDestination.getName()); - xmlWriter.writeEndElement(); - - String selector = jmsDestination.getSelector(); - if (selector != null && selector.length() != 0) { - xmlWriter.writeStartElement(XmlDataConstants.JMS_DESTINATION_SELECTOR); - xmlWriter.writeCharacters(selector); - xmlWriter.writeEndElement(); - } - - xmlWriter.writeStartElement(XmlDataConstants.JMS_DESTINATION_TYPE); - xmlWriter.writeCharacters(jmsDestination.getType().toString()); - xmlWriter.writeEndElement(); - - xmlWriter.writeStartElement(XmlDataConstants.JMS_JNDI_ENTRIES); - PersistedBindings jndi = jmsJNDI.get(new Pair<>(jmsDestination.getType(), jmsDestination.getName())); - for (String jndiEntry : jndi.getBindings()) { - xmlWriter.writeStartElement(XmlDataConstants.JMS_JNDI_ENTRY); - xmlWriter.writeCharacters(jndiEntry); - xmlWriter.writeEndElement(); - } - xmlWriter.writeEndElement(); // end jndi-entries - xmlWriter.writeEndElement(); // end JMS_CONNECTION_FACTORY - } - xmlWriter.writeEndElement(); - } - private void printAllMessagesAsXML() throws XMLStreamException { xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT); @@ -822,7 +537,10 @@ public final class XmlDataExporter extends OptionalLocking { xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, value == null ? XmlDataConstants.NULL : value.toString()); } - if (value instanceof Boolean) { + // if the value is null then we can't really know what it is so just set the type to the most generic thing + if (value == null) { + xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTES); + } else if (value instanceof Boolean) { xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BOOLEAN); } else if (value instanceof Byte) { xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTE); diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java index 225b14c3a9..a80c1f6a42 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java @@ -16,10 +16,15 @@ */ package org.apache.activemq.artemis.cli.commands.tools; +import javax.xml.XMLConstants; import javax.xml.stream.XMLInputFactory; import javax.xml.stream.XMLStreamConstants; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamReader; +import javax.xml.transform.stax.StAXSource; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; +import javax.xml.validation.Validator; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; @@ -27,10 +32,15 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.URL; import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.UUID; import io.airlift.airline.Command; @@ -53,7 +63,10 @@ import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.utils.Base64; +import org.apache.activemq.artemis.utils.ClassloadingUtil; +import org.apache.activemq.artemis.utils.ListUtil; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.jboss.logging.Logger; @@ -75,7 +88,7 @@ public final class XmlDataImporter extends ActionAbstract { // this session is really only needed if the "session" variable does not auto-commit sends ClientSession managementSession; - boolean localSession; + boolean localSession = false; final Map addressMap = new HashMap<>(); @@ -164,20 +177,21 @@ public final class XmlDataImporter extends ActionAbstract { } else { this.managementSession = session; } - localSession = false; processXml(); } public void process(InputStream inputStream, String host, int port, boolean transactional) throws Exception { - reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream); HashMap connectionParams = new HashMap<>(); connectionParams.put(TransportConstants.HOST_PROP_NAME, host); connectionParams.put(TransportConstants.PORT_PROP_NAME, Integer.toString(port)); ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams)); ClientSessionFactory sf = serverLocator.createSessionFactory(); + ClientSession session; + ClientSession managementSession; + if (user != null || password != null) { session = sf.createSession(user, password, false, !transactional, true, false, 0); managementSession = sf.createSession(user, password, false, true, true, false, 0); @@ -187,7 +201,30 @@ public final class XmlDataImporter extends ActionAbstract { } localSession = true; - processXml(); + process(inputStream, session, managementSession); + } + + public void validate(String file) throws Exception { + validate(new FileInputStream(file)); + } + + public void validate(InputStream inputStream) throws Exception { + XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream); + SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); + Schema schema = factory.newSchema(XmlDataImporter.findResource("schema/artemis-import-export.xsd")); + + Validator validator = schema.newValidator(); + validator.validate(new StAXSource(reader)); + reader.close(); + } + + private static URL findResource(final String resourceName) { + return AccessController.doPrivileged(new PrivilegedAction() { + @Override + public URL run() { + return ClassloadingUtil.findResource(resourceName); + } + }); } private void processXml() throws Exception { @@ -197,14 +234,12 @@ public final class XmlDataImporter extends ActionAbstract { logger.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] "); } if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) { - if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName())) { + if (XmlDataConstants.QUEUE_BINDINGS_CHILD.equals(reader.getLocalName())) { bindQueue(); + } else if (XmlDataConstants.ADDRESS_BINDINGS_CHILD.equals(reader.getLocalName())) { + bindAddress(); } else if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) { processMessage(); - } else if (XmlDataConstants.JMS_CONNECTION_FACTORIES.equals(reader.getLocalName())) { - createJmsConnectionFactories(); - } else if (XmlDataConstants.JMS_DESTINATIONS.equals(reader.getLocalName())) { - createJmsDestinations(); } } reader.next(); @@ -396,6 +431,10 @@ public final class XmlDataImporter extends ActionAbstract { } } + if (value.equals(XmlDataConstants.NULL)) { + value = null; + } + switch (propertyType) { case XmlDataConstants.PROPERTY_TYPE_SHORT: message.putShortProperty(key, Short.parseShort(value)); @@ -407,7 +446,7 @@ public final class XmlDataImporter extends ActionAbstract { message.putByteProperty(key, Byte.parseByte(value)); break; case XmlDataConstants.PROPERTY_TYPE_BYTES: - message.putBytesProperty(key, decode(value)); + message.putBytesProperty(key, value == null ? null : decode(value)); break; case XmlDataConstants.PROPERTY_TYPE_DOUBLE: message.putDoubleProperty(key, Double.parseDouble(value)); @@ -422,16 +461,10 @@ public final class XmlDataImporter extends ActionAbstract { message.putLongProperty(key, Long.parseLong(value)); break; case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING: - if (!value.equals(XmlDataConstants.NULL)) { - realSimpleStringValue = new SimpleString(value); - } - message.putStringProperty(new SimpleString(key), realSimpleStringValue); + message.putStringProperty(new SimpleString(key), value == null ? null : SimpleString.toSimpleString(value)); break; case XmlDataConstants.PROPERTY_TYPE_STRING: - if (!value.equals(XmlDataConstants.NULL)) { - realStringValue = value; - } - message.putStringProperty(key, realStringValue); + message.putStringProperty(key, value); break; } } @@ -509,26 +542,30 @@ public final class XmlDataImporter extends ActionAbstract { String queueName = ""; String address = ""; String filter = ""; + String routingType = ""; for (int i = 0; i < reader.getAttributeCount(); i++) { String attributeName = reader.getAttributeLocalName(i); switch (attributeName) { - case XmlDataConstants.BINDING_ADDRESS: + case XmlDataConstants.QUEUE_BINDING_ADDRESS: address = reader.getAttributeValue(i); break; - case XmlDataConstants.BINDING_QUEUE_NAME: + case XmlDataConstants.QUEUE_BINDING_NAME: queueName = reader.getAttributeValue(i); break; - case XmlDataConstants.BINDING_FILTER_STRING: + case XmlDataConstants.QUEUE_BINDING_FILTER_STRING: filter = reader.getAttributeValue(i); break; + case XmlDataConstants.QUEUE_BINDING_ROUTING_TYPE: + routingType = reader.getAttributeValue(i); + break; } } ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName)); if (!queueQuery.isExists()) { - session.createQueue(address, queueName, filter, true); + session.createQueue(address, RoutingType.valueOf(routingType), queueName, filter, true); if (logger.isDebugEnabled()) { logger.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")"); } @@ -541,350 +578,36 @@ public final class XmlDataImporter extends ActionAbstract { addressMap.put(queueName, address); } - private void createJmsConnectionFactories() throws Exception { - boolean endLoop = false; + private void bindAddress() throws Exception { + String addressName = ""; + String routingTypes = ""; - while (reader.hasNext()) { - int eventType = reader.getEventType(); - switch (eventType) { - case XMLStreamConstants.START_ELEMENT: - if (XmlDataConstants.JMS_CONNECTION_FACTORY.equals(reader.getLocalName())) { - createJmsConnectionFactory(); - } + for (int i = 0; i < reader.getAttributeCount(); i++) { + String attributeName = reader.getAttributeLocalName(i); + switch (attributeName) { + case XmlDataConstants.ADDRESS_BINDING_NAME: + addressName = reader.getAttributeValue(i); break; - case XMLStreamConstants.END_ELEMENT: - if (XmlDataConstants.JMS_CONNECTION_FACTORIES.equals(reader.getLocalName())) { - endLoop = true; - } + case XmlDataConstants.ADDRESS_BINDING_ROUTING_TYPE: + routingTypes = reader.getAttributeValue(i); break; } - if (endLoop) { - break; - } - reader.next(); - } - } - - private void createJmsDestinations() throws Exception { - boolean endLoop = false; - - while (reader.hasNext()) { - int eventType = reader.getEventType(); - switch (eventType) { - case XMLStreamConstants.START_ELEMENT: - if (XmlDataConstants.JMS_DESTINATION.equals(reader.getLocalName())) { - createJmsDestination(); - } - break; - case XMLStreamConstants.END_ELEMENT: - if (XmlDataConstants.JMS_DESTINATIONS.equals(reader.getLocalName())) { - endLoop = true; - } - break; - } - if (endLoop) { - break; - } - reader.next(); - } - } - - private void createJmsConnectionFactory() throws Exception { - String name = ""; - String callFailoverTimeout = ""; - String callTimeout = ""; - String clientFailureCheckPeriod = ""; - String clientId = ""; - String confirmationWindowSize = ""; - String connectionTtl = ""; - String connectors = ""; - String consumerMaxRate = ""; - String consumerWindowSize = ""; - String discoveryGroupName = ""; - String dupsOkBatchSize = ""; - String groupId = ""; - String loadBalancingPolicyClassName = ""; - String maxRetryInterval = ""; - String minLargeMessageSize = ""; - String producerMaxRate = ""; - String producerWindowSize = ""; - String reconnectAttempts = ""; - String retryInterval = ""; - String retryIntervalMultiplier = ""; - String scheduledThreadMaxPoolSize = ""; - String threadMaxPoolSize = ""; - String transactionBatchSize = ""; - String type = ""; - String entries = ""; - String autoGroup = ""; - String blockOnAcknowledge = ""; - String blockOnDurableSend = ""; - String blockOnNonDurableSend = ""; - String cacheLargeMessagesClient = ""; - String compressLargeMessages = ""; - String failoverOnInitialConnection = ""; - String ha = ""; - String preacknowledge = ""; - String useGlobalPools = ""; - - boolean endLoop = false; - - while (reader.hasNext()) { - int eventType = reader.getEventType(); - switch (eventType) { - case XMLStreamConstants.START_ELEMENT: - if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_FAILOVER_TIMEOUT.equals(reader.getLocalName())) { - callFailoverTimeout = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory callFailoverTimeout: " + callFailoverTimeout); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_TIMEOUT.equals(reader.getLocalName())) { - callTimeout = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory callTimeout: " + callTimeout); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_FAILURE_CHECK_PERIOD.equals(reader.getLocalName())) { - clientFailureCheckPeriod = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory clientFailureCheckPeriod: " + clientFailureCheckPeriod); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_ID.equals(reader.getLocalName())) { - clientId = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory clientId: " + clientId); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONFIRMATION_WINDOW_SIZE.equals(reader.getLocalName())) { - confirmationWindowSize = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory confirmationWindowSize: " + confirmationWindowSize); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTION_TTL.equals(reader.getLocalName())) { - connectionTtl = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory connectionTtl: " + connectionTtl); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTOR.equals(reader.getLocalName())) { - connectors = getConnectors(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory getLocalName: " + connectors); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_MAX_RATE.equals(reader.getLocalName())) { - consumerMaxRate = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory consumerMaxRate: " + consumerMaxRate); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_WINDOW_SIZE.equals(reader.getLocalName())) { - consumerWindowSize = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory consumerWindowSize: " + consumerWindowSize); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DISCOVERY_GROUP_NAME.equals(reader.getLocalName())) { - discoveryGroupName = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory discoveryGroupName: " + discoveryGroupName); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DUPS_OK_BATCH_SIZE.equals(reader.getLocalName())) { - dupsOkBatchSize = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory dupsOkBatchSize: " + dupsOkBatchSize); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_GROUP_ID.equals(reader.getLocalName())) { - groupId = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory groupId: " + groupId); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_LOAD_BALANCING_POLICY_CLASS_NAME.equals(reader.getLocalName())) { - loadBalancingPolicyClassName = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory loadBalancingPolicyClassName: " + loadBalancingPolicyClassName); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MAX_RETRY_INTERVAL.equals(reader.getLocalName())) { - maxRetryInterval = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory maxRetryInterval: " + maxRetryInterval); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MIN_LARGE_MESSAGE_SIZE.equals(reader.getLocalName())) { - minLargeMessageSize = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory minLargeMessageSize: " + minLargeMessageSize); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_NAME.equals(reader.getLocalName())) { - name = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory name: " + name); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_MAX_RATE.equals(reader.getLocalName())) { - producerMaxRate = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory producerMaxRate: " + producerMaxRate); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_WINDOW_SIZE.equals(reader.getLocalName())) { - producerWindowSize = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory producerWindowSize: " + producerWindowSize); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RECONNECT_ATTEMPTS.equals(reader.getLocalName())) { - reconnectAttempts = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory reconnectAttempts: " + reconnectAttempts); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL.equals(reader.getLocalName())) { - retryInterval = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory retryInterval: " + retryInterval); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL_MULTIPLIER.equals(reader.getLocalName())) { - retryIntervalMultiplier = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory retryIntervalMultiplier: " + retryIntervalMultiplier); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_SCHEDULED_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName())) { - scheduledThreadMaxPoolSize = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory scheduledThreadMaxPoolSize: " + scheduledThreadMaxPoolSize); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName())) { - threadMaxPoolSize = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory threadMaxPoolSize: " + threadMaxPoolSize); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TRANSACTION_BATCH_SIZE.equals(reader.getLocalName())) { - transactionBatchSize = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory transactionBatchSize: " + transactionBatchSize); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TYPE.equals(reader.getLocalName())) { - type = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory type: " + type); - } - } else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) { - entries = getEntries(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory entries: " + entries); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_AUTO_GROUP.equals(reader.getLocalName())) { - autoGroup = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory autoGroup: " + autoGroup); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_ACKNOWLEDGE.equals(reader.getLocalName())) { - blockOnAcknowledge = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory blockOnAcknowledge: " + blockOnAcknowledge); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_DURABLE_SEND.equals(reader.getLocalName())) { - blockOnDurableSend = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory blockOnDurableSend: " + blockOnDurableSend); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_NON_DURABLE_SEND.equals(reader.getLocalName())) { - blockOnNonDurableSend = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory blockOnNonDurableSend: " + blockOnNonDurableSend); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CACHE_LARGE_MESSAGES_CLIENT.equals(reader.getLocalName())) { - cacheLargeMessagesClient = reader.getElementText(); - ActiveMQServerLogger.LOGGER.info("JMS connection factory " + name + " cacheLargeMessagesClient: " + cacheLargeMessagesClient); - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_COMPRESS_LARGE_MESSAGES.equals(reader.getLocalName())) { - compressLargeMessages = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory compressLargeMessages: " + compressLargeMessages); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_FAILOVER_ON_INITIAL_CONNECTION.equals(reader.getLocalName())) { - failoverOnInitialConnection = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory failoverOnInitialConnection: " + failoverOnInitialConnection); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_HA.equals(reader.getLocalName())) { - ha = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory ha: " + ha); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PREACKNOWLEDGE.equals(reader.getLocalName())) { - preacknowledge = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory preacknowledge: " + preacknowledge); - } - } else if (XmlDataConstants.JMS_CONNECTION_FACTORY_USE_GLOBAL_POOLS.equals(reader.getLocalName())) { - useGlobalPools = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS connection factory useGlobalPools: " + useGlobalPools); - } - } - break; - case XMLStreamConstants.END_ELEMENT: - if (XmlDataConstants.JMS_CONNECTION_FACTORY.equals(reader.getLocalName())) { - endLoop = true; - } - break; - } - if (endLoop) { - break; - } - reader.next(); } - ActiveMQServerLogger.LOGGER.error("Ignoring Connection Factory " + name); - } + ClientSession.AddressQuery addressQuery = session.addressQuery(new SimpleString(addressName)); - private void createJmsDestination() throws Exception { - String name = ""; - String selector = ""; - String entries = ""; - String type = ""; - boolean endLoop = false; - - while (reader.hasNext()) { - int eventType = reader.getEventType(); - switch (eventType) { - case XMLStreamConstants.START_ELEMENT: - if (XmlDataConstants.JMS_DESTINATION_NAME.equals(reader.getLocalName())) { - name = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS destination name: " + name); - } - } else if (XmlDataConstants.JMS_DESTINATION_SELECTOR.equals(reader.getLocalName())) { - selector = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS destination selector: " + selector); - } - } else if (XmlDataConstants.JMS_DESTINATION_TYPE.equals(reader.getLocalName())) { - type = reader.getElementText(); - if (logger.isDebugEnabled()) { - logger.debug("JMS destination type: " + type); - } - } else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) { - entries = getEntries(); - } - break; - case XMLStreamConstants.END_ELEMENT: - if (XmlDataConstants.JMS_DESTINATION.equals(reader.getLocalName())) { - endLoop = true; - } - break; + if (!addressQuery.isExists()) { + Set set = new HashSet<>(); + for (String routingType : ListUtil.toList(routingTypes)) { + set.add(RoutingType.valueOf(routingType)); } - if (endLoop) { - break; + session.createAddress(SimpleString.toSimpleString(addressName), set, false); + if (logger.isDebugEnabled()) { + logger.debug("Binding address(name=" + addressName + ", routingTypes=" + routingTypes + ")"); } - reader.next(); - } - - try (ClientRequestor requestor = new ClientRequestor(managementSession, "activemq.management")) { - ClientMessage managementMessage = managementSession.createMessage(false); - if ("Queue".equals(type)) { - ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.BROKER, "createQueue", name, entries, selector); - } else if ("Topic".equals(type)) { - ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.BROKER, "createAddress", name, entries); - } - managementSession.start(); - ClientMessage reply = requestor.request(managementMessage); - if (ManagementHelper.hasOperationSucceeded(reply)) { - if (logger.isDebugEnabled()) { - logger.debug("Created " + type.toLowerCase() + " " + name); - } - } else { - ActiveMQServerLogger.LOGGER.error("Problem creating " + name); + } else { + if (logger.isDebugEnabled()) { + logger.debug("Binding " + addressName + " already exists so won't re-bind."); } } } diff --git a/artemis-cli/src/main/resources/schema/artemis-import-export.xsd b/artemis-cli/src/main/resources/schema/artemis-import-export.xsd new file mode 100644 index 0000000000..f6890561cd --- /dev/null +++ b/artemis-cli/src/main/resources/schema/artemis-import-export.xsd @@ -0,0 +1,248 @@ + + + + + + + + + + + the routing types supported by the address; valid values: MULTICAST, ANYCAST + + + + + + + the name of the address binding + + + + + + + the id of the address binding + + + + + + + + + + + + + the address name of the queue binding + + + + + + + the binding's filter (i.e. if using JMS selector syntax see + org.apache.activemq.artemis.utils.SelectorTranslator.convertToActiveMQFilterString for + conversion semantics) + + + + + + + the queue name of the binding + + + + + + + the binding's identifier + + + + + + + the binding's routing type + + + + + + + + + + + + + + + + + + + the property's name + + + + + + + the property's value; byte arrays are Base64 encoded the same way as the message's body + + + + + + + the property's type; valid values: boolean, byte, bytes, short, integer, long, float, double, + string, simple-string + + + + + + + + + + + + + + + + + + the queue's name + + + + + + + + + + + + + + + + + + + + + + + + the message's properties + + + + + + + a list of queues that hold a reference to this message + + + + + + + the body of the message (Base64 encoded using the Base64.DONT_BREAK_LINES and Base64.URL_SAFE options; + see org.apache.activemq.artemis.utils.Base64.encodeBytes(byte[], int, int, int)); stored in a CDATA + + + + + + + + the queue's identifier + + + + + + + the priority of the message (between 0-9 inclusive) + + + + + + + when this message will expire (epoch time value, 0 for never) + + + + + + + when this message was sent originally (epoch time value) + + + + + + + the message's type; valid values: DEFAULT, OBJECT, TEXT, BYTES, MAP, STREAM + + + + + + + the id of the user who sent the message + + + + + + + + + + + + + + + a list of exported bindings + + + + + + + a list of exported messages + + + + + + \ No newline at end of file diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ListUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ListUtil.java new file mode 100644 index 0000000000..666d2a9637 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ListUtil.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import java.util.ArrayList; +import java.util.List; + +public class ListUtil { + public static List toList(final String commaSeparatedString) { + List list = new ArrayList<>(); + if (commaSeparatedString == null || commaSeparatedString.trim().length() == 0) { + return list; + } + String[] values = commaSeparatedString.split(","); + for (String value : values) { + list.add(value.trim()); + } + return list; + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 271d3e081f..234366efec 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -104,6 +104,7 @@ import org.apache.activemq.artemis.core.transaction.impl.CoreTransactionDetail; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.JsonLoader; +import org.apache.activemq.artemis.utils.ListUtil; import org.apache.activemq.artemis.utils.SecurityFormatter; import org.apache.activemq.artemis.utils.TypedProperties; @@ -622,7 +623,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active clearIO(); try { Set set = new HashSet<>(); - for (String routingType : toList(routingTypes)) { + for (String routingType : ListUtil.toList(routingTypes)) { set.add(RoutingType.valueOf(routingType)); } final AddressInfo addressInfo = new AddressInfo(new SimpleString(name), set); @@ -2095,7 +2096,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active if (useDiscoveryGroup) { config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup); } else { - config.setStaticConnectors(toList(staticConnectorsOrDiscoveryGroup)); + config.setStaticConnectors(ListUtil.toList(staticConnectorsOrDiscoveryGroup)); } server.deployBridge(config); @@ -2132,7 +2133,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active if (useDiscoveryGroup) { config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup); } else { - config.setStaticConnectors(toList(staticConnectorsOrDiscoveryGroup)); + config.setStaticConnectors(ListUtil.toList(staticConnectorsOrDiscoveryGroup)); } server.deployBridge(config); @@ -2440,18 +2441,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active return new String[0]; } - private static List toList(final String commaSeparatedString) { - List list = new ArrayList<>(); - if (commaSeparatedString == null || commaSeparatedString.trim().length() == 0) { - return list; - } - String[] values = commaSeparatedString.split(","); - for (String value : values) { - list.add(value.trim()); - } - return list; - } - @Override public void onNotification(org.apache.activemq.artemis.core.server.management.Notification notification) { if (!(notification.getType() instanceof CoreNotificationType)) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java index 694b112298..09f9fc348a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java @@ -234,6 +234,9 @@ public class FileLockNodeManager extends NodeManager { ByteBuffer bb = ByteBuffer.allocateDirect(1); bb.put(status); bb.position(0); + if (!channel.isOpen()) { + setUpServerLockFile(); + } channel.write(bb, 0); channel.force(true); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java index 786302108e..d644fae11e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java @@ -27,6 +27,8 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import java.util.UUID; import org.apache.activemq.artemis.api.core.Message; @@ -47,6 +49,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageM import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.registry.JndiBindingRegistry; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.server.JMSServerManager; import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; @@ -96,7 +99,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { public void testMessageProperties() throws Exception { ClientSession session = basicSetUp(); - session.createQueue(QUEUE_NAME, QUEUE_NAME, true); + session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true); ClientProducer producer = session.createProducer(QUEUE_NAME); @@ -147,6 +150,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); xmlDataImporter.process(xmlInputStream, session); ClientConsumer consumer = session.createConsumer(QUEUE_NAME); session.start(); @@ -167,14 +172,17 @@ public class XmlImportExportTest extends ActiveMQTestBase { assertEquals(i, msg.getIntProperty("myIntProperty").intValue()); assertEquals(Long.MAX_VALUE - i, msg.getLongProperty("myLongProperty").longValue()); assertEquals(i, msg.getObjectProperty("myObjectProperty")); + assertEquals(true, msg.getPropertyNames().contains(SimpleString.toSimpleString("myNullObjectProperty"))); assertEquals(null, msg.getObjectProperty("myNullObjectProperty")); assertEquals(new Integer(i).shortValue(), msg.getShortProperty("myShortProperty").shortValue()); assertEquals("myStringPropertyValue_" + i, msg.getStringProperty("myStringProperty")); + assertEquals(true, msg.getPropertyNames().contains(SimpleString.toSimpleString("myNullStringProperty"))); assertEquals(null, msg.getStringProperty("myNullStringProperty")); assertEquals(international.toString(), msg.getStringProperty("myNonAsciiStringProperty")); assertEquals(special, msg.getStringProperty("mySpecialCharacters")); assertEquals(new SimpleString("mySimpleStringPropertyValue_" + i), msg.getSimpleStringProperty(new SimpleString("mySimpleStringProperty"))); - assertEquals(null, msg.getSimpleStringProperty(new SimpleString("myNullSimpleStringProperty"))); + assertEquals(true, msg.getPropertyNames().contains(SimpleString.toSimpleString("myNullSimpleStringProperty"))); + assertEquals(null, msg.getSimpleStringProperty("myNullSimpleStringProperty")); } } @@ -202,7 +210,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { ClientSession session = basicSetUp(); - session.createQueue(QUEUE_NAME, QUEUE_NAME, true); + session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true); ClientProducer producer = session.createProducer(QUEUE_NAME); @@ -239,6 +247,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); xmlDataImporter.process(xmlInputStream, session); ClientConsumer consumer = session.createConsumer(QUEUE_NAME); session.start(); @@ -268,7 +278,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { ClientSession session = basicSetUp(); - session.createQueue(QUEUE_NAME, QUEUE_NAME, true); + session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true); ClientProducer producer = session.createProducer(QUEUE_NAME); ClientMessage msg = session.createMessage(Message.TEXT_TYPE, true); @@ -293,6 +303,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); xmlDataImporter.process(xmlInputStream, session); ClientConsumer consumer = session.createConsumer(QUEUE_NAME); session.start(); @@ -311,7 +323,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { ClientSession session = basicSetUp(); - session.createQueue(QUEUE_NAME, QUEUE_NAME, true); + session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true); ClientProducer producer = session.createProducer(QUEUE_NAME); ClientMessage msg = session.createMessage(Message.BYTES_TYPE, true); @@ -336,6 +348,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); xmlDataImporter.process(xmlInputStream, session); ClientConsumer consumer = session.createConsumer(QUEUE_NAME); session.start(); @@ -352,7 +366,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { ClientSession session = basicSetUp(); - session.createQueue(QUEUE_NAME, QUEUE_NAME, true); + session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true); ClientProducer producer = session.createProducer(QUEUE_NAME); @@ -381,6 +395,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); xmlDataImporter.process(xmlInputStream, session); ClientConsumer consumer = session.createConsumer(QUEUE_NAME); session.start(); @@ -396,8 +412,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { public void testBindingAttributes() throws Exception { ClientSession session = basicSetUp(); - session.createQueue("addressName1", "queueName1", true); - session.createQueue("addressName1", "queueName2", "bob", true); + session.createQueue("addressName1", RoutingType.MULTICAST, "queueName1", true); + session.createQueue("addressName1", RoutingType.MULTICAST, "queueName2", "bob", true); session.close(); locator.close(); @@ -417,6 +433,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); xmlDataImporter.process(xmlInputStream, session); ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString("queueName1")); @@ -452,7 +470,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { fileMessage.releaseResources(); - session.createQueue("A", "A", true); + session.createQueue("A", RoutingType.MULTICAST, "A", true); ClientProducer prod = session.createProducer("A"); @@ -480,6 +498,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); xmlDataImporter.process(xmlInputStream, session); session.close(); session = factory.createSession(false, false); @@ -507,6 +527,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory("vm://0", "test"); Connection c = cf.createConnection(); Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + server.createQueue(SimpleString.toSimpleString("A"), RoutingType.ANYCAST, SimpleString.toSimpleString("A"), null, true, false); MessageProducer p = s.createProducer(ActiveMQJMSClient.createQueue("A")); p.setDeliveryMode(DeliveryMode.PERSISTENT); StringBuilder stringBuilder = new StringBuilder(); @@ -533,9 +554,11 @@ public class XmlImportExportTest extends ActiveMQTestBase { factory = createSessionFactory(locator); ClientSession session = factory.createSession(false, true, true); - ByteArrayInputStream inputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); + ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); XmlDataImporter xmlDataImporter = new XmlDataImporter(); - xmlDataImporter.process(inputStream, session); + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); + xmlDataImporter.process(xmlInputStream, session); session.close(); c = cf.createConnection(); @@ -553,8 +576,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { public void testPartialQueue() throws Exception { ClientSession session = basicSetUp(); - session.createQueue("myAddress", "myQueue1", true); - session.createQueue("myAddress", "myQueue2", true); + session.createQueue("myAddress", RoutingType.MULTICAST, "myQueue1", true); + session.createQueue("myAddress", RoutingType.MULTICAST, "myQueue2", true); ClientProducer producer = session.createProducer("myAddress"); @@ -586,6 +609,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); xmlDataImporter.process(xmlInputStream, session); consumer = session.createConsumer("myQueue1"); session.start(); @@ -618,8 +643,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { ClientSessionFactory factory = locator.createSessionFactory(); ClientSession session = factory.createSession(false, true, true); - session.createQueue(MY_ADDRESS, MY_QUEUE, true); - session.createQueue(MY_ADDRESS, MY_QUEUE2, true); + session.createQueue(MY_ADDRESS, RoutingType.MULTICAST, MY_QUEUE, true); + session.createQueue(MY_ADDRESS, RoutingType.MULTICAST, MY_QUEUE2, true); ClientProducer producer = session.createProducer(MY_ADDRESS); @@ -650,6 +675,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); xmlDataImporter.process(xmlInputStream, session); ClientConsumer consumer = session.createConsumer(MY_QUEUE); @@ -686,7 +713,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { factory = createSessionFactory(locator); ClientSession session = factory.createSession(false, true, true); - session.createQueue(MY_ADDRESS, MY_QUEUE, true); + session.createQueue(MY_ADDRESS, RoutingType.MULTICAST, MY_QUEUE, true); ClientProducer producer = session.createProducer(MY_ADDRESS); @@ -715,6 +742,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); xmlDataImporter.process(xmlInputStream, session); ClientConsumer consumer = session.createConsumer(MY_QUEUE); @@ -747,7 +776,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { ClientSessionFactory factory = locator.createSessionFactory(); ClientSession session = factory.createSession(false, true, true); - session.createQueue(MY_ADDRESS, MY_QUEUE, true); + session.createQueue(MY_ADDRESS, RoutingType.MULTICAST, MY_QUEUE, true); ClientProducer producer = session.createProducer(MY_ADDRESS); @@ -793,6 +822,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); xmlDataImporter.process(xmlInputStream, session); ClientConsumer consumer = session.createConsumer(MY_QUEUE); @@ -824,7 +855,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { public void testTransactional() throws Exception { ClientSession session = basicSetUp(); - session.createQueue(QUEUE_NAME, QUEUE_NAME, true); + session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true); ClientProducer producer = session.createProducer(QUEUE_NAME); @@ -850,6 +881,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); xmlDataImporter.process(xmlInputStream, session, managementSession); ClientConsumer consumer = session.createConsumer(QUEUE_NAME); session.start(); @@ -867,7 +900,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { ClientSessionFactory factory = locator.createSessionFactory(); ClientSession session = factory.createSession(false, true, true); - session.createQueue(QUEUE_NAME, QUEUE_NAME, true); + session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true); ClientProducer producer = session.createProducer(QUEUE_NAME); @@ -894,6 +927,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); xmlDataImporter.process(xmlInputStream, session, managementSession); ClientConsumer consumer = session.createConsumer(QUEUE_NAME); session.start(); @@ -916,7 +951,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { ClientSessionFactory factory = locator.createSessionFactory(); ClientSession session = factory.createSession(false, true, true); - session.createQueue(QUEUE_NAME, QUEUE_NAME, true); + session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true); ClientProducer producer = session.createProducer(QUEUE_NAME); @@ -948,6 +983,8 @@ public class XmlImportExportTest extends ActiveMQTestBase { ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); xmlDataImporter.process(xmlInputStream, session, managementSession); ClientConsumer consumer = session.createConsumer(QUEUE_NAME); session.start(); @@ -963,4 +1000,44 @@ public class XmlImportExportTest extends ActiveMQTestBase { locator.close(); server.stop(); } + + @Test + public void testRoutingTypes() throws Exception { + SimpleString myAddress = SimpleString.toSimpleString("myAddress"); + ClientSession session = basicSetUp(); + + Set routingTypes = new HashSet<>(); + routingTypes.add(RoutingType.ANYCAST); + routingTypes.add(RoutingType.MULTICAST); + + session.createAddress(myAddress, routingTypes, false); + + session.createQueue(myAddress.toString(), RoutingType.MULTICAST, "myQueue1", true); + session.createQueue(myAddress.toString(), RoutingType.MULTICAST, "myQueue2", true); + + locator.close(); + server.stop(); + + ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream(); + XmlDataExporter xmlDataExporter = new XmlDataExporter(); + xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory()); + System.out.print(new String(xmlOutputStream.toByteArray())); + + clearDataRecreateServerDirs(); + server.start(); + checkForLongs(); + locator = createInVMNonHALocator(); + factory = locator.createSessionFactory(); + session = factory.createSession(false, false, true); + ClientSession managementSession = factory.createSession(false, true, true); + + ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); + XmlDataImporter xmlDataImporter = new XmlDataImporter(); + xmlDataImporter.validate(xmlInputStream); + xmlInputStream.reset(); + xmlDataImporter.process(xmlInputStream, session, managementSession); + + assertTrue(server.getAddressInfo(myAddress).getRoutingTypes().contains(RoutingType.ANYCAST)); + assertTrue(server.getAddressInfo(myAddress).getRoutingTypes().contains(RoutingType.MULTICAST)); + } }