ARTEMIS-943 update/doc XML import/export
This commit is contained in:
parent
ca6beb16c1
commit
15d693dd4e
|
@ -29,13 +29,19 @@ public final class XmlDataConstants {
|
||||||
static final String XML_VERSION = "1.0";
|
static final String XML_VERSION = "1.0";
|
||||||
static final String DOCUMENT_PARENT = "activemq-journal";
|
static final String DOCUMENT_PARENT = "activemq-journal";
|
||||||
static final String BINDINGS_PARENT = "bindings";
|
static final String BINDINGS_PARENT = "bindings";
|
||||||
static final String BINDINGS_CHILD = "binding";
|
|
||||||
static final String BINDING_ADDRESS = "address";
|
static final String QUEUE_BINDINGS_CHILD = "queue-binding";
|
||||||
static final String BINDING_FILTER_STRING = "filter-string";
|
static final String QUEUE_BINDING_ADDRESS = "address";
|
||||||
static final String BINDING_QUEUE_NAME = "queue-name";
|
static final String QUEUE_BINDING_FILTER_STRING = "filter-string";
|
||||||
static final String BINDING_ID = "id";
|
static final String QUEUE_BINDING_NAME = "name";
|
||||||
static final String JMS_CONNECTION_FACTORY = "jms-connection-factory";
|
static final String QUEUE_BINDING_ID = "id";
|
||||||
static final String JMS_CONNECTION_FACTORIES = "jms-connection-factories";
|
static final String QUEUE_BINDING_ROUTING_TYPE = "routing-type";
|
||||||
|
|
||||||
|
static final String ADDRESS_BINDINGS_CHILD = "address-binding";
|
||||||
|
static final String ADDRESS_BINDING_NAME = "name";
|
||||||
|
static final String ADDRESS_BINDING_ID = "id";
|
||||||
|
static final String ADDRESS_BINDING_ROUTING_TYPE = "routing-types";
|
||||||
|
|
||||||
static final String MESSAGES_PARENT = "messages";
|
static final String MESSAGES_PARENT = "messages";
|
||||||
static final String MESSAGES_CHILD = "message";
|
static final String MESSAGES_CHILD = "message";
|
||||||
static final String MESSAGE_ID = "id";
|
static final String MESSAGE_ID = "id";
|
||||||
|
|
|
@ -31,7 +31,6 @@ import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
@ -42,14 +41,10 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
|
|
||||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||||
import org.apache.activemq.artemis.core.config.Configuration;
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
|
||||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
|
||||||
import org.apache.activemq.artemis.core.journal.Journal;
|
import org.apache.activemq.artemis.core.journal.Journal;
|
||||||
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
||||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||||
|
@ -74,19 +69,16 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
|
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
|
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
|
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentAddressBindingEncoding;
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
|
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.JournalType;
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||||
|
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
|
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
|
||||||
import org.apache.activemq.artemis.jms.persistence.config.PersistedBindings;
|
|
||||||
import org.apache.activemq.artemis.jms.persistence.config.PersistedConnectionFactory;
|
|
||||||
import org.apache.activemq.artemis.jms.persistence.config.PersistedDestination;
|
|
||||||
import org.apache.activemq.artemis.jms.persistence.config.PersistedType;
|
|
||||||
import org.apache.activemq.artemis.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
|
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.Base64;
|
import org.apache.activemq.artemis.utils.Base64;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
|
@ -115,11 +107,7 @@ public final class XmlDataExporter extends OptionalLocking {
|
||||||
|
|
||||||
private final HashMap<Long, PersistentQueueBindingEncoding> queueBindings = new HashMap<>();
|
private final HashMap<Long, PersistentQueueBindingEncoding> queueBindings = new HashMap<>();
|
||||||
|
|
||||||
private final Map<String, PersistedConnectionFactory> jmsConnectionFactories = new ConcurrentHashMap<>();
|
private final HashMap<Long, PersistentAddressBindingEncoding> addressBindings = new HashMap<>();
|
||||||
|
|
||||||
private final Map<Pair<PersistedType, String>, PersistedDestination> jmsDestinations = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
private final Map<Pair<PersistedType, String>, PersistedBindings> jmsJNDI = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
long messagesPrinted = 0L;
|
long messagesPrinted = 0L;
|
||||||
|
|
||||||
|
@ -161,7 +149,6 @@ public final class XmlDataExporter extends OptionalLocking {
|
||||||
private void writeXMLData() throws Exception {
|
private void writeXMLData() throws Exception {
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
getBindings();
|
getBindings();
|
||||||
getJmsBindings();
|
|
||||||
processMessageJournal();
|
processMessageJournal();
|
||||||
printDataAsXML();
|
printDataAsXML();
|
||||||
ActiveMQServerLogger.LOGGER.debug("\n\nProcessing took: " + (System.currentTimeMillis() - start) + "ms");
|
ActiveMQServerLogger.LOGGER.debug("\n\nProcessing took: " + (System.currentTimeMillis() - start) + "ms");
|
||||||
|
@ -298,58 +285,6 @@ public final class XmlDataExporter extends OptionalLocking {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void getJmsBindings() throws Exception {
|
|
||||||
SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1);
|
|
||||||
|
|
||||||
Journal jmsJournal = new JournalImpl(1024 * 1024, 2, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1);
|
|
||||||
|
|
||||||
jmsJournal.start();
|
|
||||||
|
|
||||||
List<RecordInfo> data = new ArrayList<>();
|
|
||||||
|
|
||||||
ArrayList<PreparedTransactionInfo> list = new ArrayList<>();
|
|
||||||
|
|
||||||
ActiveMQServerLogger.LOGGER.debug("Reading jms bindings journal from " + config.getBindingsDirectory());
|
|
||||||
|
|
||||||
jmsJournal.load(data, list, null);
|
|
||||||
|
|
||||||
for (RecordInfo record : data) {
|
|
||||||
long id = record.id;
|
|
||||||
|
|
||||||
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(record.data);
|
|
||||||
|
|
||||||
byte rec = record.getUserRecordType();
|
|
||||||
|
|
||||||
if (rec == JMSJournalStorageManagerImpl.CF_RECORD) {
|
|
||||||
PersistedConnectionFactory cf = new PersistedConnectionFactory();
|
|
||||||
cf.decode(buffer);
|
|
||||||
cf.setId(id);
|
|
||||||
ActiveMQServerLogger.LOGGER.info("Found JMS connection factory: " + cf.getName());
|
|
||||||
jmsConnectionFactories.put(cf.getName(), cf);
|
|
||||||
} else if (rec == JMSJournalStorageManagerImpl.DESTINATION_RECORD) {
|
|
||||||
PersistedDestination destination = new PersistedDestination();
|
|
||||||
destination.decode(buffer);
|
|
||||||
destination.setId(id);
|
|
||||||
ActiveMQServerLogger.LOGGER.info("Found JMS destination: " + destination.getName());
|
|
||||||
jmsDestinations.put(new Pair<>(destination.getType(), destination.getName()), destination);
|
|
||||||
} else if (rec == JMSJournalStorageManagerImpl.BINDING_RECORD) {
|
|
||||||
PersistedBindings jndi = new PersistedBindings();
|
|
||||||
jndi.decode(buffer);
|
|
||||||
jndi.setId(id);
|
|
||||||
Pair<PersistedType, String> key = new Pair<>(jndi.getType(), jndi.getName());
|
|
||||||
StringBuilder builder = new StringBuilder();
|
|
||||||
for (String binding : jndi.getBindings()) {
|
|
||||||
builder.append(binding).append(" ");
|
|
||||||
}
|
|
||||||
ActiveMQServerLogger.LOGGER.info("Found JMS JNDI binding data for " + jndi.getType() + " " + jndi.getName() + ": " + builder.toString());
|
|
||||||
jmsJNDI.put(key, jndi);
|
|
||||||
} else {
|
|
||||||
throw new IllegalStateException("Invalid record type " + rec);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Open the bindings journal and extract all bindings data.
|
* Open the bindings journal and extract all bindings data.
|
||||||
*
|
*
|
||||||
|
@ -370,6 +305,9 @@ public final class XmlDataExporter extends OptionalLocking {
|
||||||
if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD) {
|
if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD) {
|
||||||
PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) DescribeJournal.newObjectEncoding(info, null);
|
PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) DescribeJournal.newObjectEncoding(info, null);
|
||||||
queueBindings.put(bindingEncoding.getId(), bindingEncoding);
|
queueBindings.put(bindingEncoding.getId(), bindingEncoding);
|
||||||
|
} else if (info.getUserRecordType() == JournalRecordIds.ADDRESS_BINDING_RECORD) {
|
||||||
|
PersistentAddressBindingEncoding bindingEncoding = (PersistentAddressBindingEncoding) DescribeJournal.newObjectEncoding(info, null);
|
||||||
|
addressBindings.put(bindingEncoding.getId(), bindingEncoding);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -381,8 +319,6 @@ public final class XmlDataExporter extends OptionalLocking {
|
||||||
xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION);
|
xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION);
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT);
|
xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT);
|
||||||
printBindingsAsXML();
|
printBindingsAsXML();
|
||||||
printJmsConnectionFactoriesAsXML();
|
|
||||||
printJmsDestinationsAsXML();
|
|
||||||
printAllMessagesAsXML();
|
printAllMessagesAsXML();
|
||||||
xmlWriter.writeEndElement(); // end DOCUMENT_PARENT
|
xmlWriter.writeEndElement(); // end DOCUMENT_PARENT
|
||||||
xmlWriter.writeEndDocument();
|
xmlWriter.writeEndDocument();
|
||||||
|
@ -395,256 +331,35 @@ public final class XmlDataExporter extends OptionalLocking {
|
||||||
|
|
||||||
private void printBindingsAsXML() throws XMLStreamException {
|
private void printBindingsAsXML() throws XMLStreamException {
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.BINDINGS_PARENT);
|
xmlWriter.writeStartElement(XmlDataConstants.BINDINGS_PARENT);
|
||||||
|
for (Map.Entry<Long, PersistentAddressBindingEncoding> addressBindingEncodingEntry : addressBindings.entrySet()) {
|
||||||
|
PersistentAddressBindingEncoding bindingEncoding = addressBindings.get(addressBindingEncodingEntry.getKey());
|
||||||
|
xmlWriter.writeEmptyElement(XmlDataConstants.ADDRESS_BINDINGS_CHILD);
|
||||||
|
StringBuilder routingTypes = new StringBuilder();
|
||||||
|
for (RoutingType routingType : bindingEncoding.getRoutingTypes()) {
|
||||||
|
routingTypes.append(routingType.toString()).append(", ");
|
||||||
|
}
|
||||||
|
xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_ROUTING_TYPE, routingTypes.toString().substring(0, routingTypes.length() - 2));
|
||||||
|
xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_NAME, bindingEncoding.getName().toString());
|
||||||
|
xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_ID, Long.toString(bindingEncoding.getId()));
|
||||||
|
bindingsPrinted++;
|
||||||
|
}
|
||||||
for (Map.Entry<Long, PersistentQueueBindingEncoding> queueBindingEncodingEntry : queueBindings.entrySet()) {
|
for (Map.Entry<Long, PersistentQueueBindingEncoding> queueBindingEncodingEntry : queueBindings.entrySet()) {
|
||||||
PersistentQueueBindingEncoding bindingEncoding = queueBindings.get(queueBindingEncodingEntry.getKey());
|
PersistentQueueBindingEncoding bindingEncoding = queueBindings.get(queueBindingEncodingEntry.getKey());
|
||||||
xmlWriter.writeEmptyElement(XmlDataConstants.BINDINGS_CHILD);
|
xmlWriter.writeEmptyElement(XmlDataConstants.QUEUE_BINDINGS_CHILD);
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.BINDING_ADDRESS, bindingEncoding.getAddress().toString());
|
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ADDRESS, bindingEncoding.getAddress().toString());
|
||||||
String filter = "";
|
String filter = "";
|
||||||
if (bindingEncoding.getFilterString() != null) {
|
if (bindingEncoding.getFilterString() != null) {
|
||||||
filter = bindingEncoding.getFilterString().toString();
|
filter = bindingEncoding.getFilterString().toString();
|
||||||
}
|
}
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.BINDING_FILTER_STRING, filter);
|
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_FILTER_STRING, filter);
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.BINDING_QUEUE_NAME, bindingEncoding.getQueueName().toString());
|
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_NAME, bindingEncoding.getQueueName().toString());
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.BINDING_ID, Long.toString(bindingEncoding.getId()));
|
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ID, Long.toString(bindingEncoding.getId()));
|
||||||
|
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ROUTING_TYPE, RoutingType.getType(bindingEncoding.getRoutingType()).toString());
|
||||||
bindingsPrinted++;
|
bindingsPrinted++;
|
||||||
}
|
}
|
||||||
xmlWriter.writeEndElement(); // end BINDINGS_PARENT
|
xmlWriter.writeEndElement(); // end BINDINGS_PARENT
|
||||||
}
|
}
|
||||||
|
|
||||||
private void printJmsConnectionFactoriesAsXML() throws XMLStreamException {
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORIES);
|
|
||||||
for (String jmsConnectionFactoryKey : jmsConnectionFactories.keySet()) {
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY);
|
|
||||||
PersistedConnectionFactory jmsConnectionFactory = jmsConnectionFactories.get(jmsConnectionFactoryKey);
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_NAME);
|
|
||||||
xmlWriter.writeCharacters(jmsConnectionFactory.getName());
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
String clientID = jmsConnectionFactory.getConfig().getClientID();
|
|
||||||
if (clientID != null) {
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_ID);
|
|
||||||
xmlWriter.writeCharacters(clientID);
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
}
|
|
||||||
|
|
||||||
long callFailoverTimeout = jmsConnectionFactory.getConfig().getCallFailoverTimeout();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_FAILOVER_TIMEOUT);
|
|
||||||
xmlWriter.writeCharacters(Long.toString(callFailoverTimeout));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
long callTimeout = jmsConnectionFactory.getConfig().getCallTimeout();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_TIMEOUT);
|
|
||||||
xmlWriter.writeCharacters(Long.toString(callTimeout));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
long clientFailureCheckPeriod = jmsConnectionFactory.getConfig().getClientFailureCheckPeriod();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_FAILURE_CHECK_PERIOD);
|
|
||||||
xmlWriter.writeCharacters(Long.toString(clientFailureCheckPeriod));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
int confirmationWindowSize = jmsConnectionFactory.getConfig().getConfirmationWindowSize();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONFIRMATION_WINDOW_SIZE);
|
|
||||||
xmlWriter.writeCharacters(Integer.toString(confirmationWindowSize));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
long connectionTTL = jmsConnectionFactory.getConfig().getConnectionTTL();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTION_TTL);
|
|
||||||
xmlWriter.writeCharacters(Long.toString(connectionTTL));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
long consumerMaxRate = jmsConnectionFactory.getConfig().getConsumerMaxRate();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_MAX_RATE);
|
|
||||||
xmlWriter.writeCharacters(Long.toString(consumerMaxRate));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
long consumerWindowSize = jmsConnectionFactory.getConfig().getConsumerWindowSize();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_WINDOW_SIZE);
|
|
||||||
xmlWriter.writeCharacters(Long.toString(consumerWindowSize));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
String discoveryGroupName = jmsConnectionFactory.getConfig().getDiscoveryGroupName();
|
|
||||||
if (discoveryGroupName != null) {
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_DISCOVERY_GROUP_NAME);
|
|
||||||
xmlWriter.writeCharacters(discoveryGroupName);
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
}
|
|
||||||
|
|
||||||
int dupsOKBatchSize = jmsConnectionFactory.getConfig().getDupsOKBatchSize();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_DUPS_OK_BATCH_SIZE);
|
|
||||||
xmlWriter.writeCharacters(Integer.toString(dupsOKBatchSize));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
JMSFactoryType factoryType = jmsConnectionFactory.getConfig().getFactoryType();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_TYPE);
|
|
||||||
xmlWriter.writeCharacters(Integer.toString(factoryType.intValue()));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
String groupID = jmsConnectionFactory.getConfig().getGroupID();
|
|
||||||
if (groupID != null) {
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_GROUP_ID);
|
|
||||||
xmlWriter.writeCharacters(groupID);
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
}
|
|
||||||
|
|
||||||
String loadBalancingPolicyClassName = jmsConnectionFactory.getConfig().getLoadBalancingPolicyClassName();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_LOAD_BALANCING_POLICY_CLASS_NAME);
|
|
||||||
xmlWriter.writeCharacters(loadBalancingPolicyClassName);
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
long maxRetryInterval = jmsConnectionFactory.getConfig().getMaxRetryInterval();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_MAX_RETRY_INTERVAL);
|
|
||||||
xmlWriter.writeCharacters(Long.toString(maxRetryInterval));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
long minLargeMessageSize = jmsConnectionFactory.getConfig().getMinLargeMessageSize();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_MIN_LARGE_MESSAGE_SIZE);
|
|
||||||
xmlWriter.writeCharacters(Long.toString(minLargeMessageSize));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
long producerMaxRate = jmsConnectionFactory.getConfig().getProducerMaxRate();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_MAX_RATE);
|
|
||||||
xmlWriter.writeCharacters(Long.toString(producerMaxRate));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
long producerWindowSize = jmsConnectionFactory.getConfig().getProducerWindowSize();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_WINDOW_SIZE);
|
|
||||||
xmlWriter.writeCharacters(Long.toString(producerWindowSize));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
long reconnectAttempts = jmsConnectionFactory.getConfig().getReconnectAttempts();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_RECONNECT_ATTEMPTS);
|
|
||||||
xmlWriter.writeCharacters(Long.toString(reconnectAttempts));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
long retryInterval = jmsConnectionFactory.getConfig().getRetryInterval();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL);
|
|
||||||
xmlWriter.writeCharacters(Long.toString(retryInterval));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
double retryIntervalMultiplier = jmsConnectionFactory.getConfig().getRetryIntervalMultiplier();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL_MULTIPLIER);
|
|
||||||
xmlWriter.writeCharacters(Double.toString(retryIntervalMultiplier));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
long scheduledThreadPoolMaxSize = jmsConnectionFactory.getConfig().getScheduledThreadPoolMaxSize();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_SCHEDULED_THREAD_POOL_MAX_SIZE);
|
|
||||||
xmlWriter.writeCharacters(Long.toString(scheduledThreadPoolMaxSize));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
long threadPoolMaxSize = jmsConnectionFactory.getConfig().getThreadPoolMaxSize();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_THREAD_POOL_MAX_SIZE);
|
|
||||||
xmlWriter.writeCharacters(Long.toString(threadPoolMaxSize));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
long transactionBatchSize = jmsConnectionFactory.getConfig().getTransactionBatchSize();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_TRANSACTION_BATCH_SIZE);
|
|
||||||
xmlWriter.writeCharacters(Long.toString(transactionBatchSize));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
boolean autoGroup = jmsConnectionFactory.getConfig().isAutoGroup();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_AUTO_GROUP);
|
|
||||||
xmlWriter.writeCharacters(Boolean.toString(autoGroup));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
boolean blockOnAcknowledge = jmsConnectionFactory.getConfig().isBlockOnAcknowledge();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_ACKNOWLEDGE);
|
|
||||||
xmlWriter.writeCharacters(Boolean.toString(blockOnAcknowledge));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
boolean blockOnDurableSend = jmsConnectionFactory.getConfig().isBlockOnDurableSend();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_DURABLE_SEND);
|
|
||||||
xmlWriter.writeCharacters(Boolean.toString(blockOnDurableSend));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
boolean blockOnNonDurableSend = jmsConnectionFactory.getConfig().isBlockOnNonDurableSend();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_NON_DURABLE_SEND);
|
|
||||||
xmlWriter.writeCharacters(Boolean.toString(blockOnNonDurableSend));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
boolean cacheLargeMessagesClient = jmsConnectionFactory.getConfig().isCacheLargeMessagesClient();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CACHE_LARGE_MESSAGES_CLIENT);
|
|
||||||
xmlWriter.writeCharacters(Boolean.toString(cacheLargeMessagesClient));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
boolean compressLargeMessages = jmsConnectionFactory.getConfig().isCompressLargeMessages();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_COMPRESS_LARGE_MESSAGES);
|
|
||||||
xmlWriter.writeCharacters(Boolean.toString(compressLargeMessages));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
boolean failoverOnInitialConnection = jmsConnectionFactory.getConfig().isFailoverOnInitialConnection();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_FAILOVER_ON_INITIAL_CONNECTION);
|
|
||||||
xmlWriter.writeCharacters(Boolean.toString(failoverOnInitialConnection));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
boolean ha = jmsConnectionFactory.getConfig().isHA();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_HA);
|
|
||||||
xmlWriter.writeCharacters(Boolean.toString(ha));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
boolean preAcknowledge = jmsConnectionFactory.getConfig().isPreAcknowledge();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_PREACKNOWLEDGE);
|
|
||||||
xmlWriter.writeCharacters(Boolean.toString(preAcknowledge));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
boolean useGlobalPools = jmsConnectionFactory.getConfig().isUseGlobalPools();
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_USE_GLOBAL_POOLS);
|
|
||||||
xmlWriter.writeCharacters(Boolean.toString(useGlobalPools));
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTORS);
|
|
||||||
for (String connector : jmsConnectionFactory.getConfig().getConnectorNames()) {
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTOR);
|
|
||||||
xmlWriter.writeCharacters(connector);
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
}
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_JNDI_ENTRIES);
|
|
||||||
PersistedBindings jndi = jmsJNDI.get(new Pair<>(PersistedType.ConnectionFactory, jmsConnectionFactory.getName()));
|
|
||||||
for (String jndiEntry : jndi.getBindings()) {
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_JNDI_ENTRY);
|
|
||||||
xmlWriter.writeCharacters(jndiEntry);
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
}
|
|
||||||
xmlWriter.writeEndElement(); // end jndi-entries
|
|
||||||
xmlWriter.writeEndElement(); // end JMS_CONNECTION_FACTORY
|
|
||||||
}
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void printJmsDestinationsAsXML() throws XMLStreamException {
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_DESTINATIONS);
|
|
||||||
for (Pair<PersistedType, String> jmsDestinationsKey : jmsDestinations.keySet()) {
|
|
||||||
PersistedDestination jmsDestination = jmsDestinations.get(jmsDestinationsKey);
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_DESTINATION);
|
|
||||||
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_DESTINATION_NAME);
|
|
||||||
xmlWriter.writeCharacters(jmsDestination.getName());
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
String selector = jmsDestination.getSelector();
|
|
||||||
if (selector != null && selector.length() != 0) {
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_DESTINATION_SELECTOR);
|
|
||||||
xmlWriter.writeCharacters(selector);
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
}
|
|
||||||
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_DESTINATION_TYPE);
|
|
||||||
xmlWriter.writeCharacters(jmsDestination.getType().toString());
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_JNDI_ENTRIES);
|
|
||||||
PersistedBindings jndi = jmsJNDI.get(new Pair<>(jmsDestination.getType(), jmsDestination.getName()));
|
|
||||||
for (String jndiEntry : jndi.getBindings()) {
|
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.JMS_JNDI_ENTRY);
|
|
||||||
xmlWriter.writeCharacters(jndiEntry);
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
}
|
|
||||||
xmlWriter.writeEndElement(); // end jndi-entries
|
|
||||||
xmlWriter.writeEndElement(); // end JMS_CONNECTION_FACTORY
|
|
||||||
}
|
|
||||||
xmlWriter.writeEndElement();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void printAllMessagesAsXML() throws XMLStreamException {
|
private void printAllMessagesAsXML() throws XMLStreamException {
|
||||||
xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT);
|
xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT);
|
||||||
|
|
||||||
|
@ -822,7 +537,10 @@ public final class XmlDataExporter extends OptionalLocking {
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, value == null ? XmlDataConstants.NULL : value.toString());
|
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, value == null ? XmlDataConstants.NULL : value.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (value instanceof Boolean) {
|
// if the value is null then we can't really know what it is so just set the type to the most generic thing
|
||||||
|
if (value == null) {
|
||||||
|
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTES);
|
||||||
|
} else if (value instanceof Boolean) {
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BOOLEAN);
|
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BOOLEAN);
|
||||||
} else if (value instanceof Byte) {
|
} else if (value instanceof Byte) {
|
||||||
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTE);
|
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTE);
|
||||||
|
|
|
@ -16,10 +16,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.cli.commands.tools;
|
package org.apache.activemq.artemis.cli.commands.tools;
|
||||||
|
|
||||||
|
import javax.xml.XMLConstants;
|
||||||
import javax.xml.stream.XMLInputFactory;
|
import javax.xml.stream.XMLInputFactory;
|
||||||
import javax.xml.stream.XMLStreamConstants;
|
import javax.xml.stream.XMLStreamConstants;
|
||||||
import javax.xml.stream.XMLStreamException;
|
import javax.xml.stream.XMLStreamException;
|
||||||
import javax.xml.stream.XMLStreamReader;
|
import javax.xml.stream.XMLStreamReader;
|
||||||
|
import javax.xml.transform.stax.StAXSource;
|
||||||
|
import javax.xml.validation.Schema;
|
||||||
|
import javax.xml.validation.SchemaFactory;
|
||||||
|
import javax.xml.validation.Validator;
|
||||||
import java.io.BufferedInputStream;
|
import java.io.BufferedInputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
|
@ -27,10 +32,15 @@ import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.net.URL;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.security.AccessController;
|
||||||
|
import java.security.PrivilegedAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import io.airlift.airline.Command;
|
import io.airlift.airline.Command;
|
||||||
|
@ -53,7 +63,10 @@ import org.apache.activemq.artemis.core.message.impl.MessageImpl;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
|
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||||
import org.apache.activemq.artemis.utils.Base64;
|
import org.apache.activemq.artemis.utils.Base64;
|
||||||
|
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
||||||
|
import org.apache.activemq.artemis.utils.ListUtil;
|
||||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
@ -75,7 +88,7 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
// this session is really only needed if the "session" variable does not auto-commit sends
|
// this session is really only needed if the "session" variable does not auto-commit sends
|
||||||
ClientSession managementSession;
|
ClientSession managementSession;
|
||||||
|
|
||||||
boolean localSession;
|
boolean localSession = false;
|
||||||
|
|
||||||
final Map<String, String> addressMap = new HashMap<>();
|
final Map<String, String> addressMap = new HashMap<>();
|
||||||
|
|
||||||
|
@ -164,20 +177,21 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
} else {
|
} else {
|
||||||
this.managementSession = session;
|
this.managementSession = session;
|
||||||
}
|
}
|
||||||
localSession = false;
|
|
||||||
|
|
||||||
processXml();
|
processXml();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void process(InputStream inputStream, String host, int port, boolean transactional) throws Exception {
|
public void process(InputStream inputStream, String host, int port, boolean transactional) throws Exception {
|
||||||
reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
|
|
||||||
HashMap<String, Object> connectionParams = new HashMap<>();
|
HashMap<String, Object> connectionParams = new HashMap<>();
|
||||||
connectionParams.put(TransportConstants.HOST_PROP_NAME, host);
|
connectionParams.put(TransportConstants.HOST_PROP_NAME, host);
|
||||||
connectionParams.put(TransportConstants.PORT_PROP_NAME, Integer.toString(port));
|
connectionParams.put(TransportConstants.PORT_PROP_NAME, Integer.toString(port));
|
||||||
ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
|
ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
|
||||||
ClientSessionFactory sf = serverLocator.createSessionFactory();
|
ClientSessionFactory sf = serverLocator.createSessionFactory();
|
||||||
|
|
||||||
|
ClientSession session;
|
||||||
|
ClientSession managementSession;
|
||||||
|
|
||||||
if (user != null || password != null) {
|
if (user != null || password != null) {
|
||||||
session = sf.createSession(user, password, false, !transactional, true, false, 0);
|
session = sf.createSession(user, password, false, !transactional, true, false, 0);
|
||||||
managementSession = sf.createSession(user, password, false, true, true, false, 0);
|
managementSession = sf.createSession(user, password, false, true, true, false, 0);
|
||||||
|
@ -187,7 +201,30 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
}
|
}
|
||||||
localSession = true;
|
localSession = true;
|
||||||
|
|
||||||
processXml();
|
process(inputStream, session, managementSession);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void validate(String file) throws Exception {
|
||||||
|
validate(new FileInputStream(file));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void validate(InputStream inputStream) throws Exception {
|
||||||
|
XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
|
||||||
|
SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
|
||||||
|
Schema schema = factory.newSchema(XmlDataImporter.findResource("schema/artemis-import-export.xsd"));
|
||||||
|
|
||||||
|
Validator validator = schema.newValidator();
|
||||||
|
validator.validate(new StAXSource(reader));
|
||||||
|
reader.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static URL findResource(final String resourceName) {
|
||||||
|
return AccessController.doPrivileged(new PrivilegedAction<URL>() {
|
||||||
|
@Override
|
||||||
|
public URL run() {
|
||||||
|
return ClassloadingUtil.findResource(resourceName);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processXml() throws Exception {
|
private void processXml() throws Exception {
|
||||||
|
@ -197,14 +234,12 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
logger.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
|
logger.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
|
||||||
}
|
}
|
||||||
if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) {
|
if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) {
|
||||||
if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName())) {
|
if (XmlDataConstants.QUEUE_BINDINGS_CHILD.equals(reader.getLocalName())) {
|
||||||
bindQueue();
|
bindQueue();
|
||||||
|
} else if (XmlDataConstants.ADDRESS_BINDINGS_CHILD.equals(reader.getLocalName())) {
|
||||||
|
bindAddress();
|
||||||
} else if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) {
|
} else if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) {
|
||||||
processMessage();
|
processMessage();
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORIES.equals(reader.getLocalName())) {
|
|
||||||
createJmsConnectionFactories();
|
|
||||||
} else if (XmlDataConstants.JMS_DESTINATIONS.equals(reader.getLocalName())) {
|
|
||||||
createJmsDestinations();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
reader.next();
|
reader.next();
|
||||||
|
@ -396,6 +431,10 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (value.equals(XmlDataConstants.NULL)) {
|
||||||
|
value = null;
|
||||||
|
}
|
||||||
|
|
||||||
switch (propertyType) {
|
switch (propertyType) {
|
||||||
case XmlDataConstants.PROPERTY_TYPE_SHORT:
|
case XmlDataConstants.PROPERTY_TYPE_SHORT:
|
||||||
message.putShortProperty(key, Short.parseShort(value));
|
message.putShortProperty(key, Short.parseShort(value));
|
||||||
|
@ -407,7 +446,7 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
message.putByteProperty(key, Byte.parseByte(value));
|
message.putByteProperty(key, Byte.parseByte(value));
|
||||||
break;
|
break;
|
||||||
case XmlDataConstants.PROPERTY_TYPE_BYTES:
|
case XmlDataConstants.PROPERTY_TYPE_BYTES:
|
||||||
message.putBytesProperty(key, decode(value));
|
message.putBytesProperty(key, value == null ? null : decode(value));
|
||||||
break;
|
break;
|
||||||
case XmlDataConstants.PROPERTY_TYPE_DOUBLE:
|
case XmlDataConstants.PROPERTY_TYPE_DOUBLE:
|
||||||
message.putDoubleProperty(key, Double.parseDouble(value));
|
message.putDoubleProperty(key, Double.parseDouble(value));
|
||||||
|
@ -422,16 +461,10 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
message.putLongProperty(key, Long.parseLong(value));
|
message.putLongProperty(key, Long.parseLong(value));
|
||||||
break;
|
break;
|
||||||
case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING:
|
case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING:
|
||||||
if (!value.equals(XmlDataConstants.NULL)) {
|
message.putStringProperty(new SimpleString(key), value == null ? null : SimpleString.toSimpleString(value));
|
||||||
realSimpleStringValue = new SimpleString(value);
|
|
||||||
}
|
|
||||||
message.putStringProperty(new SimpleString(key), realSimpleStringValue);
|
|
||||||
break;
|
break;
|
||||||
case XmlDataConstants.PROPERTY_TYPE_STRING:
|
case XmlDataConstants.PROPERTY_TYPE_STRING:
|
||||||
if (!value.equals(XmlDataConstants.NULL)) {
|
message.putStringProperty(key, value);
|
||||||
realStringValue = value;
|
|
||||||
}
|
|
||||||
message.putStringProperty(key, realStringValue);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -509,26 +542,30 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
String queueName = "";
|
String queueName = "";
|
||||||
String address = "";
|
String address = "";
|
||||||
String filter = "";
|
String filter = "";
|
||||||
|
String routingType = "";
|
||||||
|
|
||||||
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
||||||
String attributeName = reader.getAttributeLocalName(i);
|
String attributeName = reader.getAttributeLocalName(i);
|
||||||
switch (attributeName) {
|
switch (attributeName) {
|
||||||
case XmlDataConstants.BINDING_ADDRESS:
|
case XmlDataConstants.QUEUE_BINDING_ADDRESS:
|
||||||
address = reader.getAttributeValue(i);
|
address = reader.getAttributeValue(i);
|
||||||
break;
|
break;
|
||||||
case XmlDataConstants.BINDING_QUEUE_NAME:
|
case XmlDataConstants.QUEUE_BINDING_NAME:
|
||||||
queueName = reader.getAttributeValue(i);
|
queueName = reader.getAttributeValue(i);
|
||||||
break;
|
break;
|
||||||
case XmlDataConstants.BINDING_FILTER_STRING:
|
case XmlDataConstants.QUEUE_BINDING_FILTER_STRING:
|
||||||
filter = reader.getAttributeValue(i);
|
filter = reader.getAttributeValue(i);
|
||||||
break;
|
break;
|
||||||
|
case XmlDataConstants.QUEUE_BINDING_ROUTING_TYPE:
|
||||||
|
routingType = reader.getAttributeValue(i);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));
|
ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));
|
||||||
|
|
||||||
if (!queueQuery.isExists()) {
|
if (!queueQuery.isExists()) {
|
||||||
session.createQueue(address, queueName, filter, true);
|
session.createQueue(address, RoutingType.valueOf(routingType), queueName, filter, true);
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
|
logger.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
|
||||||
}
|
}
|
||||||
|
@ -541,350 +578,36 @@ public final class XmlDataImporter extends ActionAbstract {
|
||||||
addressMap.put(queueName, address);
|
addressMap.put(queueName, address);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createJmsConnectionFactories() throws Exception {
|
private void bindAddress() throws Exception {
|
||||||
boolean endLoop = false;
|
String addressName = "";
|
||||||
|
String routingTypes = "";
|
||||||
|
|
||||||
while (reader.hasNext()) {
|
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
||||||
int eventType = reader.getEventType();
|
String attributeName = reader.getAttributeLocalName(i);
|
||||||
switch (eventType) {
|
switch (attributeName) {
|
||||||
case XMLStreamConstants.START_ELEMENT:
|
case XmlDataConstants.ADDRESS_BINDING_NAME:
|
||||||
if (XmlDataConstants.JMS_CONNECTION_FACTORY.equals(reader.getLocalName())) {
|
addressName = reader.getAttributeValue(i);
|
||||||
createJmsConnectionFactory();
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case XMLStreamConstants.END_ELEMENT:
|
case XmlDataConstants.ADDRESS_BINDING_ROUTING_TYPE:
|
||||||
if (XmlDataConstants.JMS_CONNECTION_FACTORIES.equals(reader.getLocalName())) {
|
routingTypes = reader.getAttributeValue(i);
|
||||||
endLoop = true;
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (endLoop) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
reader.next();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createJmsDestinations() throws Exception {
|
ClientSession.AddressQuery addressQuery = session.addressQuery(new SimpleString(addressName));
|
||||||
boolean endLoop = false;
|
|
||||||
|
|
||||||
while (reader.hasNext()) {
|
if (!addressQuery.isExists()) {
|
||||||
int eventType = reader.getEventType();
|
Set<RoutingType> set = new HashSet<>();
|
||||||
switch (eventType) {
|
for (String routingType : ListUtil.toList(routingTypes)) {
|
||||||
case XMLStreamConstants.START_ELEMENT:
|
set.add(RoutingType.valueOf(routingType));
|
||||||
if (XmlDataConstants.JMS_DESTINATION.equals(reader.getLocalName())) {
|
|
||||||
createJmsDestination();
|
|
||||||
}
|
}
|
||||||
break;
|
session.createAddress(SimpleString.toSimpleString(addressName), set, false);
|
||||||
case XMLStreamConstants.END_ELEMENT:
|
|
||||||
if (XmlDataConstants.JMS_DESTINATIONS.equals(reader.getLocalName())) {
|
|
||||||
endLoop = true;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (endLoop) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
reader.next();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createJmsConnectionFactory() throws Exception {
|
|
||||||
String name = "";
|
|
||||||
String callFailoverTimeout = "";
|
|
||||||
String callTimeout = "";
|
|
||||||
String clientFailureCheckPeriod = "";
|
|
||||||
String clientId = "";
|
|
||||||
String confirmationWindowSize = "";
|
|
||||||
String connectionTtl = "";
|
|
||||||
String connectors = "";
|
|
||||||
String consumerMaxRate = "";
|
|
||||||
String consumerWindowSize = "";
|
|
||||||
String discoveryGroupName = "";
|
|
||||||
String dupsOkBatchSize = "";
|
|
||||||
String groupId = "";
|
|
||||||
String loadBalancingPolicyClassName = "";
|
|
||||||
String maxRetryInterval = "";
|
|
||||||
String minLargeMessageSize = "";
|
|
||||||
String producerMaxRate = "";
|
|
||||||
String producerWindowSize = "";
|
|
||||||
String reconnectAttempts = "";
|
|
||||||
String retryInterval = "";
|
|
||||||
String retryIntervalMultiplier = "";
|
|
||||||
String scheduledThreadMaxPoolSize = "";
|
|
||||||
String threadMaxPoolSize = "";
|
|
||||||
String transactionBatchSize = "";
|
|
||||||
String type = "";
|
|
||||||
String entries = "";
|
|
||||||
String autoGroup = "";
|
|
||||||
String blockOnAcknowledge = "";
|
|
||||||
String blockOnDurableSend = "";
|
|
||||||
String blockOnNonDurableSend = "";
|
|
||||||
String cacheLargeMessagesClient = "";
|
|
||||||
String compressLargeMessages = "";
|
|
||||||
String failoverOnInitialConnection = "";
|
|
||||||
String ha = "";
|
|
||||||
String preacknowledge = "";
|
|
||||||
String useGlobalPools = "";
|
|
||||||
|
|
||||||
boolean endLoop = false;
|
|
||||||
|
|
||||||
while (reader.hasNext()) {
|
|
||||||
int eventType = reader.getEventType();
|
|
||||||
switch (eventType) {
|
|
||||||
case XMLStreamConstants.START_ELEMENT:
|
|
||||||
if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_FAILOVER_TIMEOUT.equals(reader.getLocalName())) {
|
|
||||||
callFailoverTimeout = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("JMS connection factory callFailoverTimeout: " + callFailoverTimeout);
|
logger.debug("Binding address(name=" + addressName + ", routingTypes=" + routingTypes + ")");
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_TIMEOUT.equals(reader.getLocalName())) {
|
|
||||||
callTimeout = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory callTimeout: " + callTimeout);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_FAILURE_CHECK_PERIOD.equals(reader.getLocalName())) {
|
|
||||||
clientFailureCheckPeriod = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory clientFailureCheckPeriod: " + clientFailureCheckPeriod);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_ID.equals(reader.getLocalName())) {
|
|
||||||
clientId = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory clientId: " + clientId);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONFIRMATION_WINDOW_SIZE.equals(reader.getLocalName())) {
|
|
||||||
confirmationWindowSize = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory confirmationWindowSize: " + confirmationWindowSize);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTION_TTL.equals(reader.getLocalName())) {
|
|
||||||
connectionTtl = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory connectionTtl: " + connectionTtl);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTOR.equals(reader.getLocalName())) {
|
|
||||||
connectors = getConnectors();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory getLocalName: " + connectors);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_MAX_RATE.equals(reader.getLocalName())) {
|
|
||||||
consumerMaxRate = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory consumerMaxRate: " + consumerMaxRate);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_WINDOW_SIZE.equals(reader.getLocalName())) {
|
|
||||||
consumerWindowSize = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory consumerWindowSize: " + consumerWindowSize);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DISCOVERY_GROUP_NAME.equals(reader.getLocalName())) {
|
|
||||||
discoveryGroupName = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory discoveryGroupName: " + discoveryGroupName);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DUPS_OK_BATCH_SIZE.equals(reader.getLocalName())) {
|
|
||||||
dupsOkBatchSize = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory dupsOkBatchSize: " + dupsOkBatchSize);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_GROUP_ID.equals(reader.getLocalName())) {
|
|
||||||
groupId = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory groupId: " + groupId);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_LOAD_BALANCING_POLICY_CLASS_NAME.equals(reader.getLocalName())) {
|
|
||||||
loadBalancingPolicyClassName = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory loadBalancingPolicyClassName: " + loadBalancingPolicyClassName);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MAX_RETRY_INTERVAL.equals(reader.getLocalName())) {
|
|
||||||
maxRetryInterval = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory maxRetryInterval: " + maxRetryInterval);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MIN_LARGE_MESSAGE_SIZE.equals(reader.getLocalName())) {
|
|
||||||
minLargeMessageSize = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory minLargeMessageSize: " + minLargeMessageSize);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_NAME.equals(reader.getLocalName())) {
|
|
||||||
name = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory name: " + name);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_MAX_RATE.equals(reader.getLocalName())) {
|
|
||||||
producerMaxRate = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory producerMaxRate: " + producerMaxRate);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_WINDOW_SIZE.equals(reader.getLocalName())) {
|
|
||||||
producerWindowSize = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory producerWindowSize: " + producerWindowSize);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RECONNECT_ATTEMPTS.equals(reader.getLocalName())) {
|
|
||||||
reconnectAttempts = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory reconnectAttempts: " + reconnectAttempts);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL.equals(reader.getLocalName())) {
|
|
||||||
retryInterval = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory retryInterval: " + retryInterval);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL_MULTIPLIER.equals(reader.getLocalName())) {
|
|
||||||
retryIntervalMultiplier = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory retryIntervalMultiplier: " + retryIntervalMultiplier);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_SCHEDULED_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName())) {
|
|
||||||
scheduledThreadMaxPoolSize = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory scheduledThreadMaxPoolSize: " + scheduledThreadMaxPoolSize);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName())) {
|
|
||||||
threadMaxPoolSize = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory threadMaxPoolSize: " + threadMaxPoolSize);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TRANSACTION_BATCH_SIZE.equals(reader.getLocalName())) {
|
|
||||||
transactionBatchSize = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory transactionBatchSize: " + transactionBatchSize);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TYPE.equals(reader.getLocalName())) {
|
|
||||||
type = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory type: " + type);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) {
|
|
||||||
entries = getEntries();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory entries: " + entries);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_AUTO_GROUP.equals(reader.getLocalName())) {
|
|
||||||
autoGroup = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory autoGroup: " + autoGroup);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_ACKNOWLEDGE.equals(reader.getLocalName())) {
|
|
||||||
blockOnAcknowledge = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory blockOnAcknowledge: " + blockOnAcknowledge);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_DURABLE_SEND.equals(reader.getLocalName())) {
|
|
||||||
blockOnDurableSend = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory blockOnDurableSend: " + blockOnDurableSend);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_NON_DURABLE_SEND.equals(reader.getLocalName())) {
|
|
||||||
blockOnNonDurableSend = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory blockOnNonDurableSend: " + blockOnNonDurableSend);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CACHE_LARGE_MESSAGES_CLIENT.equals(reader.getLocalName())) {
|
|
||||||
cacheLargeMessagesClient = reader.getElementText();
|
|
||||||
ActiveMQServerLogger.LOGGER.info("JMS connection factory " + name + " cacheLargeMessagesClient: " + cacheLargeMessagesClient);
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_COMPRESS_LARGE_MESSAGES.equals(reader.getLocalName())) {
|
|
||||||
compressLargeMessages = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory compressLargeMessages: " + compressLargeMessages);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_FAILOVER_ON_INITIAL_CONNECTION.equals(reader.getLocalName())) {
|
|
||||||
failoverOnInitialConnection = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory failoverOnInitialConnection: " + failoverOnInitialConnection);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_HA.equals(reader.getLocalName())) {
|
|
||||||
ha = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory ha: " + ha);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PREACKNOWLEDGE.equals(reader.getLocalName())) {
|
|
||||||
preacknowledge = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory preacknowledge: " + preacknowledge);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_USE_GLOBAL_POOLS.equals(reader.getLocalName())) {
|
|
||||||
useGlobalPools = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS connection factory useGlobalPools: " + useGlobalPools);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case XMLStreamConstants.END_ELEMENT:
|
|
||||||
if (XmlDataConstants.JMS_CONNECTION_FACTORY.equals(reader.getLocalName())) {
|
|
||||||
endLoop = true;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (endLoop) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
reader.next();
|
|
||||||
}
|
|
||||||
|
|
||||||
ActiveMQServerLogger.LOGGER.error("Ignoring Connection Factory " + name);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createJmsDestination() throws Exception {
|
|
||||||
String name = "";
|
|
||||||
String selector = "";
|
|
||||||
String entries = "";
|
|
||||||
String type = "";
|
|
||||||
boolean endLoop = false;
|
|
||||||
|
|
||||||
while (reader.hasNext()) {
|
|
||||||
int eventType = reader.getEventType();
|
|
||||||
switch (eventType) {
|
|
||||||
case XMLStreamConstants.START_ELEMENT:
|
|
||||||
if (XmlDataConstants.JMS_DESTINATION_NAME.equals(reader.getLocalName())) {
|
|
||||||
name = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS destination name: " + name);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_DESTINATION_SELECTOR.equals(reader.getLocalName())) {
|
|
||||||
selector = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS destination selector: " + selector);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_DESTINATION_TYPE.equals(reader.getLocalName())) {
|
|
||||||
type = reader.getElementText();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("JMS destination type: " + type);
|
|
||||||
}
|
|
||||||
} else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) {
|
|
||||||
entries = getEntries();
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case XMLStreamConstants.END_ELEMENT:
|
|
||||||
if (XmlDataConstants.JMS_DESTINATION.equals(reader.getLocalName())) {
|
|
||||||
endLoop = true;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (endLoop) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
reader.next();
|
|
||||||
}
|
|
||||||
|
|
||||||
try (ClientRequestor requestor = new ClientRequestor(managementSession, "activemq.management")) {
|
|
||||||
ClientMessage managementMessage = managementSession.createMessage(false);
|
|
||||||
if ("Queue".equals(type)) {
|
|
||||||
ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.BROKER, "createQueue", name, entries, selector);
|
|
||||||
} else if ("Topic".equals(type)) {
|
|
||||||
ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.BROKER, "createAddress", name, entries);
|
|
||||||
}
|
|
||||||
managementSession.start();
|
|
||||||
ClientMessage reply = requestor.request(managementMessage);
|
|
||||||
if (ManagementHelper.hasOperationSucceeded(reply)) {
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("Created " + type.toLowerCase() + " " + name);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ActiveMQServerLogger.LOGGER.error("Problem creating " + name);
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Binding " + addressName + " already exists so won't re-bind.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,248 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
|
||||||
|
attributeFormDefault="unqualified"
|
||||||
|
elementFormDefault="qualified"
|
||||||
|
version="1.0">
|
||||||
|
<xsd:element name="activemq-journal" type="activemq-journalType"/>
|
||||||
|
<xsd:complexType name="address-bindingType">
|
||||||
|
<xsd:simpleContent>
|
||||||
|
<xsd:extension base="xsd:string">
|
||||||
|
<xsd:attribute type="xsd:string" name="routing-types" use="required">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the routing types supported by the address; valid values: MULTICAST, ANYCAST
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
<xsd:attribute type="xsd:string" name="name" use="required">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the name of the address binding
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
<xsd:attribute type="xsd:long" name="id" use="optional">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the id of the address binding
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
</xsd:extension>
|
||||||
|
</xsd:simpleContent>
|
||||||
|
</xsd:complexType>
|
||||||
|
<xsd:complexType name="queue-bindingType">
|
||||||
|
<xsd:simpleContent>
|
||||||
|
<xsd:extension base="xsd:string">
|
||||||
|
<xsd:attribute type="xsd:string" name="address" use="required">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the address name of the queue binding
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
<xsd:attribute type="xsd:string" name="filter-string" use="optional">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the binding's filter (i.e. if using JMS selector syntax see
|
||||||
|
org.apache.activemq.artemis.utils.SelectorTranslator.convertToActiveMQFilterString for
|
||||||
|
conversion semantics)
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
<xsd:attribute type="xsd:string" name="name" use="required">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the queue name of the binding
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
<xsd:attribute type="xsd:long" name="id" use="optional">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the binding's identifier
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
<xsd:attribute type="xsd:string" name="routing-type" use="required">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the binding's routing type
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
</xsd:extension>
|
||||||
|
</xsd:simpleContent>
|
||||||
|
</xsd:complexType>
|
||||||
|
<xsd:complexType name="bindingsType">
|
||||||
|
<xsd:sequence>
|
||||||
|
<xsd:element type="address-bindingType" name="address-binding" maxOccurs="unbounded" minOccurs="0"/>
|
||||||
|
<xsd:element type="queue-bindingType" name="queue-binding" maxOccurs="unbounded" minOccurs="0"/>
|
||||||
|
</xsd:sequence>
|
||||||
|
</xsd:complexType>
|
||||||
|
<xsd:complexType name="propertyType">
|
||||||
|
<xsd:simpleContent>
|
||||||
|
<xsd:extension base="xsd:string">
|
||||||
|
<xsd:attribute type="xsd:string" name="name" use="required">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the property's name
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
<xsd:attribute type="xsd:string" name="value" use="required">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the property's value; byte arrays are Base64 encoded the same way as the message's body
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
<xsd:attribute type="xsd:string" name="type" use="optional">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the property's type; valid values: boolean, byte, bytes, short, integer, long, float, double,
|
||||||
|
string, simple-string
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
</xsd:extension>
|
||||||
|
</xsd:simpleContent>
|
||||||
|
</xsd:complexType>
|
||||||
|
<xsd:complexType name="propertiesType">
|
||||||
|
<xsd:sequence>
|
||||||
|
<xsd:element type="propertyType" name="property" maxOccurs="unbounded" minOccurs="0"/>
|
||||||
|
</xsd:sequence>
|
||||||
|
</xsd:complexType>
|
||||||
|
<xsd:complexType name="queueType">
|
||||||
|
<xsd:simpleContent>
|
||||||
|
<xsd:extension base="xsd:string">
|
||||||
|
<xsd:attribute type="xsd:string" name="name">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the queue's name
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
</xsd:extension>
|
||||||
|
</xsd:simpleContent>
|
||||||
|
</xsd:complexType>
|
||||||
|
<xsd:complexType name="queuesType">
|
||||||
|
<xsd:sequence>
|
||||||
|
<xsd:element type="queueType" name="queue"/>
|
||||||
|
</xsd:sequence>
|
||||||
|
</xsd:complexType>
|
||||||
|
<xsd:complexType name="bodyType">
|
||||||
|
<xsd:simpleContent>
|
||||||
|
<xsd:extension base="xsd:string">
|
||||||
|
<xsd:attribute type="xsd:boolean" name="isLarge" use="optional"/>
|
||||||
|
</xsd:extension>
|
||||||
|
</xsd:simpleContent>
|
||||||
|
</xsd:complexType>
|
||||||
|
<xsd:complexType name="messageType">
|
||||||
|
<xsd:sequence>
|
||||||
|
<xsd:element type="propertiesType" name="properties">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the message's properties
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
|
<xsd:element type="queuesType" name="queues" maxOccurs="1" minOccurs="1">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
a list of queues that hold a reference to this message
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
|
<xsd:element type="bodyType" name="body">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the body of the message (Base64 encoded using the Base64.DONT_BREAK_LINES and Base64.URL_SAFE options;
|
||||||
|
see org.apache.activemq.artemis.utils.Base64.encodeBytes(byte[], int, int, int)); stored in a CDATA
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
|
</xsd:sequence>
|
||||||
|
<xsd:attribute type="xsd:long" name="id" use="optional">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the queue's identifier
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
<xsd:attribute type="xsd:byte" name="priority" use="optional" default="0">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the priority of the message (between 0-9 inclusive)
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
<xsd:attribute type="xsd:long" name="expiration" use="optional" default="0">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
when this message will expire (epoch time value, 0 for never)
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
<xsd:attribute type="xsd:long" name="timestamp" use="optional" default="0">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
when this message was sent originally (epoch time value)
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
<xsd:attribute type="xsd:string" name="type" use="optional" default="DEFAULT">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the message's type; valid values: DEFAULT, OBJECT, TEXT, BYTES, MAP, STREAM
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
<xsd:attribute type="xsd:string" name="user-id" use="optional" default="">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the id of the user who sent the message
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:attribute>
|
||||||
|
</xsd:complexType>
|
||||||
|
<xsd:complexType name="messagesType">
|
||||||
|
<xsd:sequence>
|
||||||
|
<xsd:element type="messageType" name="message" minOccurs="0" maxOccurs="unbounded"/>
|
||||||
|
</xsd:sequence>
|
||||||
|
</xsd:complexType>
|
||||||
|
<xsd:complexType name="activemq-journalType">
|
||||||
|
<xsd:sequence>
|
||||||
|
<xsd:element type="bindingsType" name="bindings">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
a list of exported bindings
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
|
<xsd:element type="messagesType" name="messages">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
a list of exported messages
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
|
</xsd:sequence>
|
||||||
|
</xsd:complexType>
|
||||||
|
</xsd:schema>
|
|
@ -0,0 +1,35 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.utils;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class ListUtil {
|
||||||
|
public static List<String> toList(final String commaSeparatedString) {
|
||||||
|
List<String> list = new ArrayList<>();
|
||||||
|
if (commaSeparatedString == null || commaSeparatedString.trim().length() == 0) {
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
String[] values = commaSeparatedString.split(",");
|
||||||
|
for (String value : values) {
|
||||||
|
list.add(value.trim());
|
||||||
|
}
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
}
|
|
@ -104,6 +104,7 @@ import org.apache.activemq.artemis.core.transaction.impl.CoreTransactionDetail;
|
||||||
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.utils.JsonLoader;
|
import org.apache.activemq.artemis.utils.JsonLoader;
|
||||||
|
import org.apache.activemq.artemis.utils.ListUtil;
|
||||||
import org.apache.activemq.artemis.utils.SecurityFormatter;
|
import org.apache.activemq.artemis.utils.SecurityFormatter;
|
||||||
import org.apache.activemq.artemis.utils.TypedProperties;
|
import org.apache.activemq.artemis.utils.TypedProperties;
|
||||||
|
|
||||||
|
@ -622,7 +623,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
||||||
clearIO();
|
clearIO();
|
||||||
try {
|
try {
|
||||||
Set<RoutingType> set = new HashSet<>();
|
Set<RoutingType> set = new HashSet<>();
|
||||||
for (String routingType : toList(routingTypes)) {
|
for (String routingType : ListUtil.toList(routingTypes)) {
|
||||||
set.add(RoutingType.valueOf(routingType));
|
set.add(RoutingType.valueOf(routingType));
|
||||||
}
|
}
|
||||||
final AddressInfo addressInfo = new AddressInfo(new SimpleString(name), set);
|
final AddressInfo addressInfo = new AddressInfo(new SimpleString(name), set);
|
||||||
|
@ -2095,7 +2096,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
||||||
if (useDiscoveryGroup) {
|
if (useDiscoveryGroup) {
|
||||||
config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);
|
config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);
|
||||||
} else {
|
} else {
|
||||||
config.setStaticConnectors(toList(staticConnectorsOrDiscoveryGroup));
|
config.setStaticConnectors(ListUtil.toList(staticConnectorsOrDiscoveryGroup));
|
||||||
}
|
}
|
||||||
|
|
||||||
server.deployBridge(config);
|
server.deployBridge(config);
|
||||||
|
@ -2132,7 +2133,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
||||||
if (useDiscoveryGroup) {
|
if (useDiscoveryGroup) {
|
||||||
config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);
|
config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);
|
||||||
} else {
|
} else {
|
||||||
config.setStaticConnectors(toList(staticConnectorsOrDiscoveryGroup));
|
config.setStaticConnectors(ListUtil.toList(staticConnectorsOrDiscoveryGroup));
|
||||||
}
|
}
|
||||||
|
|
||||||
server.deployBridge(config);
|
server.deployBridge(config);
|
||||||
|
@ -2440,18 +2441,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
||||||
return new String[0];
|
return new String[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<String> toList(final String commaSeparatedString) {
|
|
||||||
List<String> list = new ArrayList<>();
|
|
||||||
if (commaSeparatedString == null || commaSeparatedString.trim().length() == 0) {
|
|
||||||
return list;
|
|
||||||
}
|
|
||||||
String[] values = commaSeparatedString.split(",");
|
|
||||||
for (String value : values) {
|
|
||||||
list.add(value.trim());
|
|
||||||
}
|
|
||||||
return list;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onNotification(org.apache.activemq.artemis.core.server.management.Notification notification) {
|
public void onNotification(org.apache.activemq.artemis.core.server.management.Notification notification) {
|
||||||
if (!(notification.getType() instanceof CoreNotificationType))
|
if (!(notification.getType() instanceof CoreNotificationType))
|
||||||
|
|
|
@ -234,6 +234,9 @@ public class FileLockNodeManager extends NodeManager {
|
||||||
ByteBuffer bb = ByteBuffer.allocateDirect(1);
|
ByteBuffer bb = ByteBuffer.allocateDirect(1);
|
||||||
bb.put(status);
|
bb.put(status);
|
||||||
bb.position(0);
|
bb.position(0);
|
||||||
|
if (!channel.isOpen()) {
|
||||||
|
setUpServerLockFile();
|
||||||
|
}
|
||||||
channel.write(bb, 0);
|
channel.write(bb, 0);
|
||||||
channel.force(true);
|
channel.force(true);
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,8 @@ import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
|
@ -47,6 +49,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageM
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
|
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
|
||||||
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
|
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.jms.server.JMSServerManager;
|
import org.apache.activemq.artemis.jms.server.JMSServerManager;
|
||||||
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
|
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
|
||||||
|
@ -96,7 +99,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
public void testMessageProperties() throws Exception {
|
public void testMessageProperties() throws Exception {
|
||||||
ClientSession session = basicSetUp();
|
ClientSession session = basicSetUp();
|
||||||
|
|
||||||
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
|
session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true);
|
||||||
|
|
||||||
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
||||||
|
|
||||||
|
@ -147,6 +150,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||||
|
xmlDataImporter.validate(xmlInputStream);
|
||||||
|
xmlInputStream.reset();
|
||||||
xmlDataImporter.process(xmlInputStream, session);
|
xmlDataImporter.process(xmlInputStream, session);
|
||||||
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
||||||
session.start();
|
session.start();
|
||||||
|
@ -167,14 +172,17 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
assertEquals(i, msg.getIntProperty("myIntProperty").intValue());
|
assertEquals(i, msg.getIntProperty("myIntProperty").intValue());
|
||||||
assertEquals(Long.MAX_VALUE - i, msg.getLongProperty("myLongProperty").longValue());
|
assertEquals(Long.MAX_VALUE - i, msg.getLongProperty("myLongProperty").longValue());
|
||||||
assertEquals(i, msg.getObjectProperty("myObjectProperty"));
|
assertEquals(i, msg.getObjectProperty("myObjectProperty"));
|
||||||
|
assertEquals(true, msg.getPropertyNames().contains(SimpleString.toSimpleString("myNullObjectProperty")));
|
||||||
assertEquals(null, msg.getObjectProperty("myNullObjectProperty"));
|
assertEquals(null, msg.getObjectProperty("myNullObjectProperty"));
|
||||||
assertEquals(new Integer(i).shortValue(), msg.getShortProperty("myShortProperty").shortValue());
|
assertEquals(new Integer(i).shortValue(), msg.getShortProperty("myShortProperty").shortValue());
|
||||||
assertEquals("myStringPropertyValue_" + i, msg.getStringProperty("myStringProperty"));
|
assertEquals("myStringPropertyValue_" + i, msg.getStringProperty("myStringProperty"));
|
||||||
|
assertEquals(true, msg.getPropertyNames().contains(SimpleString.toSimpleString("myNullStringProperty")));
|
||||||
assertEquals(null, msg.getStringProperty("myNullStringProperty"));
|
assertEquals(null, msg.getStringProperty("myNullStringProperty"));
|
||||||
assertEquals(international.toString(), msg.getStringProperty("myNonAsciiStringProperty"));
|
assertEquals(international.toString(), msg.getStringProperty("myNonAsciiStringProperty"));
|
||||||
assertEquals(special, msg.getStringProperty("mySpecialCharacters"));
|
assertEquals(special, msg.getStringProperty("mySpecialCharacters"));
|
||||||
assertEquals(new SimpleString("mySimpleStringPropertyValue_" + i), msg.getSimpleStringProperty(new SimpleString("mySimpleStringProperty")));
|
assertEquals(new SimpleString("mySimpleStringPropertyValue_" + i), msg.getSimpleStringProperty(new SimpleString("mySimpleStringProperty")));
|
||||||
assertEquals(null, msg.getSimpleStringProperty(new SimpleString("myNullSimpleStringProperty")));
|
assertEquals(true, msg.getPropertyNames().contains(SimpleString.toSimpleString("myNullSimpleStringProperty")));
|
||||||
|
assertEquals(null, msg.getSimpleStringProperty("myNullSimpleStringProperty"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -202,7 +210,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ClientSession session = basicSetUp();
|
ClientSession session = basicSetUp();
|
||||||
|
|
||||||
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
|
session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true);
|
||||||
|
|
||||||
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
||||||
|
|
||||||
|
@ -239,6 +247,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||||
|
xmlDataImporter.validate(xmlInputStream);
|
||||||
|
xmlInputStream.reset();
|
||||||
xmlDataImporter.process(xmlInputStream, session);
|
xmlDataImporter.process(xmlInputStream, session);
|
||||||
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
||||||
session.start();
|
session.start();
|
||||||
|
@ -268,7 +278,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ClientSession session = basicSetUp();
|
ClientSession session = basicSetUp();
|
||||||
|
|
||||||
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
|
session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true);
|
||||||
|
|
||||||
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
||||||
ClientMessage msg = session.createMessage(Message.TEXT_TYPE, true);
|
ClientMessage msg = session.createMessage(Message.TEXT_TYPE, true);
|
||||||
|
@ -293,6 +303,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||||
|
xmlDataImporter.validate(xmlInputStream);
|
||||||
|
xmlInputStream.reset();
|
||||||
xmlDataImporter.process(xmlInputStream, session);
|
xmlDataImporter.process(xmlInputStream, session);
|
||||||
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
||||||
session.start();
|
session.start();
|
||||||
|
@ -311,7 +323,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ClientSession session = basicSetUp();
|
ClientSession session = basicSetUp();
|
||||||
|
|
||||||
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
|
session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true);
|
||||||
|
|
||||||
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
||||||
ClientMessage msg = session.createMessage(Message.BYTES_TYPE, true);
|
ClientMessage msg = session.createMessage(Message.BYTES_TYPE, true);
|
||||||
|
@ -336,6 +348,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||||
|
xmlDataImporter.validate(xmlInputStream);
|
||||||
|
xmlInputStream.reset();
|
||||||
xmlDataImporter.process(xmlInputStream, session);
|
xmlDataImporter.process(xmlInputStream, session);
|
||||||
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
||||||
session.start();
|
session.start();
|
||||||
|
@ -352,7 +366,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ClientSession session = basicSetUp();
|
ClientSession session = basicSetUp();
|
||||||
|
|
||||||
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
|
session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true);
|
||||||
|
|
||||||
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
||||||
|
|
||||||
|
@ -381,6 +395,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||||
|
xmlDataImporter.validate(xmlInputStream);
|
||||||
|
xmlInputStream.reset();
|
||||||
xmlDataImporter.process(xmlInputStream, session);
|
xmlDataImporter.process(xmlInputStream, session);
|
||||||
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
||||||
session.start();
|
session.start();
|
||||||
|
@ -396,8 +412,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
public void testBindingAttributes() throws Exception {
|
public void testBindingAttributes() throws Exception {
|
||||||
ClientSession session = basicSetUp();
|
ClientSession session = basicSetUp();
|
||||||
|
|
||||||
session.createQueue("addressName1", "queueName1", true);
|
session.createQueue("addressName1", RoutingType.MULTICAST, "queueName1", true);
|
||||||
session.createQueue("addressName1", "queueName2", "bob", true);
|
session.createQueue("addressName1", RoutingType.MULTICAST, "queueName2", "bob", true);
|
||||||
|
|
||||||
session.close();
|
session.close();
|
||||||
locator.close();
|
locator.close();
|
||||||
|
@ -417,6 +433,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||||
|
xmlDataImporter.validate(xmlInputStream);
|
||||||
|
xmlInputStream.reset();
|
||||||
xmlDataImporter.process(xmlInputStream, session);
|
xmlDataImporter.process(xmlInputStream, session);
|
||||||
|
|
||||||
ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString("queueName1"));
|
ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString("queueName1"));
|
||||||
|
@ -452,7 +470,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
fileMessage.releaseResources();
|
fileMessage.releaseResources();
|
||||||
|
|
||||||
session.createQueue("A", "A", true);
|
session.createQueue("A", RoutingType.MULTICAST, "A", true);
|
||||||
|
|
||||||
ClientProducer prod = session.createProducer("A");
|
ClientProducer prod = session.createProducer("A");
|
||||||
|
|
||||||
|
@ -480,6 +498,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||||
|
xmlDataImporter.validate(xmlInputStream);
|
||||||
|
xmlInputStream.reset();
|
||||||
xmlDataImporter.process(xmlInputStream, session);
|
xmlDataImporter.process(xmlInputStream, session);
|
||||||
session.close();
|
session.close();
|
||||||
session = factory.createSession(false, false);
|
session = factory.createSession(false, false);
|
||||||
|
@ -507,6 +527,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory("vm://0", "test");
|
ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory("vm://0", "test");
|
||||||
Connection c = cf.createConnection();
|
Connection c = cf.createConnection();
|
||||||
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
server.createQueue(SimpleString.toSimpleString("A"), RoutingType.ANYCAST, SimpleString.toSimpleString("A"), null, true, false);
|
||||||
MessageProducer p = s.createProducer(ActiveMQJMSClient.createQueue("A"));
|
MessageProducer p = s.createProducer(ActiveMQJMSClient.createQueue("A"));
|
||||||
p.setDeliveryMode(DeliveryMode.PERSISTENT);
|
p.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
StringBuilder stringBuilder = new StringBuilder();
|
StringBuilder stringBuilder = new StringBuilder();
|
||||||
|
@ -533,9 +554,11 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
factory = createSessionFactory(locator);
|
factory = createSessionFactory(locator);
|
||||||
ClientSession session = factory.createSession(false, true, true);
|
ClientSession session = factory.createSession(false, true, true);
|
||||||
|
|
||||||
ByteArrayInputStream inputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||||
xmlDataImporter.process(inputStream, session);
|
xmlDataImporter.validate(xmlInputStream);
|
||||||
|
xmlInputStream.reset();
|
||||||
|
xmlDataImporter.process(xmlInputStream, session);
|
||||||
session.close();
|
session.close();
|
||||||
|
|
||||||
c = cf.createConnection();
|
c = cf.createConnection();
|
||||||
|
@ -553,8 +576,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
public void testPartialQueue() throws Exception {
|
public void testPartialQueue() throws Exception {
|
||||||
ClientSession session = basicSetUp();
|
ClientSession session = basicSetUp();
|
||||||
|
|
||||||
session.createQueue("myAddress", "myQueue1", true);
|
session.createQueue("myAddress", RoutingType.MULTICAST, "myQueue1", true);
|
||||||
session.createQueue("myAddress", "myQueue2", true);
|
session.createQueue("myAddress", RoutingType.MULTICAST, "myQueue2", true);
|
||||||
|
|
||||||
ClientProducer producer = session.createProducer("myAddress");
|
ClientProducer producer = session.createProducer("myAddress");
|
||||||
|
|
||||||
|
@ -586,6 +609,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||||
|
xmlDataImporter.validate(xmlInputStream);
|
||||||
|
xmlInputStream.reset();
|
||||||
xmlDataImporter.process(xmlInputStream, session);
|
xmlDataImporter.process(xmlInputStream, session);
|
||||||
consumer = session.createConsumer("myQueue1");
|
consumer = session.createConsumer("myQueue1");
|
||||||
session.start();
|
session.start();
|
||||||
|
@ -618,8 +643,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
ClientSessionFactory factory = locator.createSessionFactory();
|
ClientSessionFactory factory = locator.createSessionFactory();
|
||||||
ClientSession session = factory.createSession(false, true, true);
|
ClientSession session = factory.createSession(false, true, true);
|
||||||
|
|
||||||
session.createQueue(MY_ADDRESS, MY_QUEUE, true);
|
session.createQueue(MY_ADDRESS, RoutingType.MULTICAST, MY_QUEUE, true);
|
||||||
session.createQueue(MY_ADDRESS, MY_QUEUE2, true);
|
session.createQueue(MY_ADDRESS, RoutingType.MULTICAST, MY_QUEUE2, true);
|
||||||
|
|
||||||
ClientProducer producer = session.createProducer(MY_ADDRESS);
|
ClientProducer producer = session.createProducer(MY_ADDRESS);
|
||||||
|
|
||||||
|
@ -650,6 +675,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||||
|
xmlDataImporter.validate(xmlInputStream);
|
||||||
|
xmlInputStream.reset();
|
||||||
xmlDataImporter.process(xmlInputStream, session);
|
xmlDataImporter.process(xmlInputStream, session);
|
||||||
|
|
||||||
ClientConsumer consumer = session.createConsumer(MY_QUEUE);
|
ClientConsumer consumer = session.createConsumer(MY_QUEUE);
|
||||||
|
@ -686,7 +713,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
factory = createSessionFactory(locator);
|
factory = createSessionFactory(locator);
|
||||||
ClientSession session = factory.createSession(false, true, true);
|
ClientSession session = factory.createSession(false, true, true);
|
||||||
|
|
||||||
session.createQueue(MY_ADDRESS, MY_QUEUE, true);
|
session.createQueue(MY_ADDRESS, RoutingType.MULTICAST, MY_QUEUE, true);
|
||||||
|
|
||||||
ClientProducer producer = session.createProducer(MY_ADDRESS);
|
ClientProducer producer = session.createProducer(MY_ADDRESS);
|
||||||
|
|
||||||
|
@ -715,6 +742,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||||
|
xmlDataImporter.validate(xmlInputStream);
|
||||||
|
xmlInputStream.reset();
|
||||||
xmlDataImporter.process(xmlInputStream, session);
|
xmlDataImporter.process(xmlInputStream, session);
|
||||||
|
|
||||||
ClientConsumer consumer = session.createConsumer(MY_QUEUE);
|
ClientConsumer consumer = session.createConsumer(MY_QUEUE);
|
||||||
|
@ -747,7 +776,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
ClientSessionFactory factory = locator.createSessionFactory();
|
ClientSessionFactory factory = locator.createSessionFactory();
|
||||||
ClientSession session = factory.createSession(false, true, true);
|
ClientSession session = factory.createSession(false, true, true);
|
||||||
|
|
||||||
session.createQueue(MY_ADDRESS, MY_QUEUE, true);
|
session.createQueue(MY_ADDRESS, RoutingType.MULTICAST, MY_QUEUE, true);
|
||||||
|
|
||||||
ClientProducer producer = session.createProducer(MY_ADDRESS);
|
ClientProducer producer = session.createProducer(MY_ADDRESS);
|
||||||
|
|
||||||
|
@ -793,6 +822,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||||
|
xmlDataImporter.validate(xmlInputStream);
|
||||||
|
xmlInputStream.reset();
|
||||||
xmlDataImporter.process(xmlInputStream, session);
|
xmlDataImporter.process(xmlInputStream, session);
|
||||||
|
|
||||||
ClientConsumer consumer = session.createConsumer(MY_QUEUE);
|
ClientConsumer consumer = session.createConsumer(MY_QUEUE);
|
||||||
|
@ -824,7 +855,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
public void testTransactional() throws Exception {
|
public void testTransactional() throws Exception {
|
||||||
ClientSession session = basicSetUp();
|
ClientSession session = basicSetUp();
|
||||||
|
|
||||||
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
|
session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true);
|
||||||
|
|
||||||
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
||||||
|
|
||||||
|
@ -850,6 +881,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||||
|
xmlDataImporter.validate(xmlInputStream);
|
||||||
|
xmlInputStream.reset();
|
||||||
xmlDataImporter.process(xmlInputStream, session, managementSession);
|
xmlDataImporter.process(xmlInputStream, session, managementSession);
|
||||||
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
||||||
session.start();
|
session.start();
|
||||||
|
@ -867,7 +900,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
ClientSessionFactory factory = locator.createSessionFactory();
|
ClientSessionFactory factory = locator.createSessionFactory();
|
||||||
ClientSession session = factory.createSession(false, true, true);
|
ClientSession session = factory.createSession(false, true, true);
|
||||||
|
|
||||||
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
|
session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true);
|
||||||
|
|
||||||
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
||||||
|
|
||||||
|
@ -894,6 +927,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||||
|
xmlDataImporter.validate(xmlInputStream);
|
||||||
|
xmlInputStream.reset();
|
||||||
xmlDataImporter.process(xmlInputStream, session, managementSession);
|
xmlDataImporter.process(xmlInputStream, session, managementSession);
|
||||||
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
||||||
session.start();
|
session.start();
|
||||||
|
@ -916,7 +951,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
ClientSessionFactory factory = locator.createSessionFactory();
|
ClientSessionFactory factory = locator.createSessionFactory();
|
||||||
ClientSession session = factory.createSession(false, true, true);
|
ClientSession session = factory.createSession(false, true, true);
|
||||||
|
|
||||||
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
|
session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true);
|
||||||
|
|
||||||
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
||||||
|
|
||||||
|
@ -948,6 +983,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||||
|
xmlDataImporter.validate(xmlInputStream);
|
||||||
|
xmlInputStream.reset();
|
||||||
xmlDataImporter.process(xmlInputStream, session, managementSession);
|
xmlDataImporter.process(xmlInputStream, session, managementSession);
|
||||||
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
||||||
session.start();
|
session.start();
|
||||||
|
@ -963,4 +1000,44 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
||||||
locator.close();
|
locator.close();
|
||||||
server.stop();
|
server.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRoutingTypes() throws Exception {
|
||||||
|
SimpleString myAddress = SimpleString.toSimpleString("myAddress");
|
||||||
|
ClientSession session = basicSetUp();
|
||||||
|
|
||||||
|
Set<RoutingType> routingTypes = new HashSet<>();
|
||||||
|
routingTypes.add(RoutingType.ANYCAST);
|
||||||
|
routingTypes.add(RoutingType.MULTICAST);
|
||||||
|
|
||||||
|
session.createAddress(myAddress, routingTypes, false);
|
||||||
|
|
||||||
|
session.createQueue(myAddress.toString(), RoutingType.MULTICAST, "myQueue1", true);
|
||||||
|
session.createQueue(myAddress.toString(), RoutingType.MULTICAST, "myQueue2", true);
|
||||||
|
|
||||||
|
locator.close();
|
||||||
|
server.stop();
|
||||||
|
|
||||||
|
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
|
||||||
|
XmlDataExporter xmlDataExporter = new XmlDataExporter();
|
||||||
|
xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory());
|
||||||
|
System.out.print(new String(xmlOutputStream.toByteArray()));
|
||||||
|
|
||||||
|
clearDataRecreateServerDirs();
|
||||||
|
server.start();
|
||||||
|
checkForLongs();
|
||||||
|
locator = createInVMNonHALocator();
|
||||||
|
factory = locator.createSessionFactory();
|
||||||
|
session = factory.createSession(false, false, true);
|
||||||
|
ClientSession managementSession = factory.createSession(false, true, true);
|
||||||
|
|
||||||
|
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||||
|
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||||
|
xmlDataImporter.validate(xmlInputStream);
|
||||||
|
xmlInputStream.reset();
|
||||||
|
xmlDataImporter.process(xmlInputStream, session, managementSession);
|
||||||
|
|
||||||
|
assertTrue(server.getAddressInfo(myAddress).getRoutingTypes().contains(RoutingType.ANYCAST));
|
||||||
|
assertTrue(server.getAddressInfo(myAddress).getRoutingTypes().contains(RoutingType.MULTICAST));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue