This closes #990

This commit is contained in:
Clebert Suconic 2017-02-06 16:29:44 -05:00
commit 0222b767a4
8 changed files with 508 additions and 709 deletions

View File

@ -29,13 +29,19 @@ public final class XmlDataConstants {
static final String XML_VERSION = "1.0";
static final String DOCUMENT_PARENT = "activemq-journal";
static final String BINDINGS_PARENT = "bindings";
static final String BINDINGS_CHILD = "binding";
static final String BINDING_ADDRESS = "address";
static final String BINDING_FILTER_STRING = "filter-string";
static final String BINDING_QUEUE_NAME = "queue-name";
static final String BINDING_ID = "id";
static final String JMS_CONNECTION_FACTORY = "jms-connection-factory";
static final String JMS_CONNECTION_FACTORIES = "jms-connection-factories";
static final String QUEUE_BINDINGS_CHILD = "queue-binding";
static final String QUEUE_BINDING_ADDRESS = "address";
static final String QUEUE_BINDING_FILTER_STRING = "filter-string";
static final String QUEUE_BINDING_NAME = "name";
static final String QUEUE_BINDING_ID = "id";
static final String QUEUE_BINDING_ROUTING_TYPE = "routing-type";
static final String ADDRESS_BINDINGS_CHILD = "address-binding";
static final String ADDRESS_BINDING_NAME = "name";
static final String ADDRESS_BINDING_ID = "id";
static final String ADDRESS_BINDING_ROUTING_TYPE = "routing-types";
static final String MESSAGES_PARENT = "messages";
static final String MESSAGES_CHILD = "message";
static final String MESSAGE_ID = "id";

View File

@ -31,7 +31,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -42,14 +41,10 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
@ -74,19 +69,16 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentAddressBindingEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
import org.apache.activemq.artemis.jms.persistence.config.PersistedBindings;
import org.apache.activemq.artemis.jms.persistence.config.PersistedConnectionFactory;
import org.apache.activemq.artemis.jms.persistence.config.PersistedDestination;
import org.apache.activemq.artemis.jms.persistence.config.PersistedType;
import org.apache.activemq.artemis.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.ExecutorFactory;
@ -115,11 +107,7 @@ public final class XmlDataExporter extends OptionalLocking {
private final HashMap<Long, PersistentQueueBindingEncoding> queueBindings = new HashMap<>();
private final Map<String, PersistedConnectionFactory> jmsConnectionFactories = new ConcurrentHashMap<>();
private final Map<Pair<PersistedType, String>, PersistedDestination> jmsDestinations = new ConcurrentHashMap<>();
private final Map<Pair<PersistedType, String>, PersistedBindings> jmsJNDI = new ConcurrentHashMap<>();
private final HashMap<Long, PersistentAddressBindingEncoding> addressBindings = new HashMap<>();
long messagesPrinted = 0L;
@ -161,7 +149,6 @@ public final class XmlDataExporter extends OptionalLocking {
private void writeXMLData() throws Exception {
long start = System.currentTimeMillis();
getBindings();
getJmsBindings();
processMessageJournal();
printDataAsXML();
ActiveMQServerLogger.LOGGER.debug("\n\nProcessing took: " + (System.currentTimeMillis() - start) + "ms");
@ -298,58 +285,6 @@ public final class XmlDataExporter extends OptionalLocking {
}
}
private void getJmsBindings() throws Exception {
SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1);
Journal jmsJournal = new JournalImpl(1024 * 1024, 2, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1);
jmsJournal.start();
List<RecordInfo> data = new ArrayList<>();
ArrayList<PreparedTransactionInfo> list = new ArrayList<>();
ActiveMQServerLogger.LOGGER.debug("Reading jms bindings journal from " + config.getBindingsDirectory());
jmsJournal.load(data, list, null);
for (RecordInfo record : data) {
long id = record.id;
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(record.data);
byte rec = record.getUserRecordType();
if (rec == JMSJournalStorageManagerImpl.CF_RECORD) {
PersistedConnectionFactory cf = new PersistedConnectionFactory();
cf.decode(buffer);
cf.setId(id);
ActiveMQServerLogger.LOGGER.info("Found JMS connection factory: " + cf.getName());
jmsConnectionFactories.put(cf.getName(), cf);
} else if (rec == JMSJournalStorageManagerImpl.DESTINATION_RECORD) {
PersistedDestination destination = new PersistedDestination();
destination.decode(buffer);
destination.setId(id);
ActiveMQServerLogger.LOGGER.info("Found JMS destination: " + destination.getName());
jmsDestinations.put(new Pair<>(destination.getType(), destination.getName()), destination);
} else if (rec == JMSJournalStorageManagerImpl.BINDING_RECORD) {
PersistedBindings jndi = new PersistedBindings();
jndi.decode(buffer);
jndi.setId(id);
Pair<PersistedType, String> key = new Pair<>(jndi.getType(), jndi.getName());
StringBuilder builder = new StringBuilder();
for (String binding : jndi.getBindings()) {
builder.append(binding).append(" ");
}
ActiveMQServerLogger.LOGGER.info("Found JMS JNDI binding data for " + jndi.getType() + " " + jndi.getName() + ": " + builder.toString());
jmsJNDI.put(key, jndi);
} else {
throw new IllegalStateException("Invalid record type " + rec);
}
}
}
/**
* Open the bindings journal and extract all bindings data.
*
@ -370,6 +305,9 @@ public final class XmlDataExporter extends OptionalLocking {
if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD) {
PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) DescribeJournal.newObjectEncoding(info, null);
queueBindings.put(bindingEncoding.getId(), bindingEncoding);
} else if (info.getUserRecordType() == JournalRecordIds.ADDRESS_BINDING_RECORD) {
PersistentAddressBindingEncoding bindingEncoding = (PersistentAddressBindingEncoding) DescribeJournal.newObjectEncoding(info, null);
addressBindings.put(bindingEncoding.getId(), bindingEncoding);
}
}
@ -381,8 +319,6 @@ public final class XmlDataExporter extends OptionalLocking {
xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION);
xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT);
printBindingsAsXML();
printJmsConnectionFactoriesAsXML();
printJmsDestinationsAsXML();
printAllMessagesAsXML();
xmlWriter.writeEndElement(); // end DOCUMENT_PARENT
xmlWriter.writeEndDocument();
@ -395,256 +331,35 @@ public final class XmlDataExporter extends OptionalLocking {
private void printBindingsAsXML() throws XMLStreamException {
xmlWriter.writeStartElement(XmlDataConstants.BINDINGS_PARENT);
for (Map.Entry<Long, PersistentAddressBindingEncoding> addressBindingEncodingEntry : addressBindings.entrySet()) {
PersistentAddressBindingEncoding bindingEncoding = addressBindings.get(addressBindingEncodingEntry.getKey());
xmlWriter.writeEmptyElement(XmlDataConstants.ADDRESS_BINDINGS_CHILD);
StringBuilder routingTypes = new StringBuilder();
for (RoutingType routingType : bindingEncoding.getRoutingTypes()) {
routingTypes.append(routingType.toString()).append(", ");
}
xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_ROUTING_TYPE, routingTypes.toString().substring(0, routingTypes.length() - 2));
xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_NAME, bindingEncoding.getName().toString());
xmlWriter.writeAttribute(XmlDataConstants.ADDRESS_BINDING_ID, Long.toString(bindingEncoding.getId()));
bindingsPrinted++;
}
for (Map.Entry<Long, PersistentQueueBindingEncoding> queueBindingEncodingEntry : queueBindings.entrySet()) {
PersistentQueueBindingEncoding bindingEncoding = queueBindings.get(queueBindingEncodingEntry.getKey());
xmlWriter.writeEmptyElement(XmlDataConstants.BINDINGS_CHILD);
xmlWriter.writeAttribute(XmlDataConstants.BINDING_ADDRESS, bindingEncoding.getAddress().toString());
xmlWriter.writeEmptyElement(XmlDataConstants.QUEUE_BINDINGS_CHILD);
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ADDRESS, bindingEncoding.getAddress().toString());
String filter = "";
if (bindingEncoding.getFilterString() != null) {
filter = bindingEncoding.getFilterString().toString();
}
xmlWriter.writeAttribute(XmlDataConstants.BINDING_FILTER_STRING, filter);
xmlWriter.writeAttribute(XmlDataConstants.BINDING_QUEUE_NAME, bindingEncoding.getQueueName().toString());
xmlWriter.writeAttribute(XmlDataConstants.BINDING_ID, Long.toString(bindingEncoding.getId()));
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_FILTER_STRING, filter);
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_NAME, bindingEncoding.getQueueName().toString());
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ID, Long.toString(bindingEncoding.getId()));
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ROUTING_TYPE, RoutingType.getType(bindingEncoding.getRoutingType()).toString());
bindingsPrinted++;
}
xmlWriter.writeEndElement(); // end BINDINGS_PARENT
}
private void printJmsConnectionFactoriesAsXML() throws XMLStreamException {
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORIES);
for (String jmsConnectionFactoryKey : jmsConnectionFactories.keySet()) {
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY);
PersistedConnectionFactory jmsConnectionFactory = jmsConnectionFactories.get(jmsConnectionFactoryKey);
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_NAME);
xmlWriter.writeCharacters(jmsConnectionFactory.getName());
xmlWriter.writeEndElement();
String clientID = jmsConnectionFactory.getConfig().getClientID();
if (clientID != null) {
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_ID);
xmlWriter.writeCharacters(clientID);
xmlWriter.writeEndElement();
}
long callFailoverTimeout = jmsConnectionFactory.getConfig().getCallFailoverTimeout();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_FAILOVER_TIMEOUT);
xmlWriter.writeCharacters(Long.toString(callFailoverTimeout));
xmlWriter.writeEndElement();
long callTimeout = jmsConnectionFactory.getConfig().getCallTimeout();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_TIMEOUT);
xmlWriter.writeCharacters(Long.toString(callTimeout));
xmlWriter.writeEndElement();
long clientFailureCheckPeriod = jmsConnectionFactory.getConfig().getClientFailureCheckPeriod();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_FAILURE_CHECK_PERIOD);
xmlWriter.writeCharacters(Long.toString(clientFailureCheckPeriod));
xmlWriter.writeEndElement();
int confirmationWindowSize = jmsConnectionFactory.getConfig().getConfirmationWindowSize();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONFIRMATION_WINDOW_SIZE);
xmlWriter.writeCharacters(Integer.toString(confirmationWindowSize));
xmlWriter.writeEndElement();
long connectionTTL = jmsConnectionFactory.getConfig().getConnectionTTL();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTION_TTL);
xmlWriter.writeCharacters(Long.toString(connectionTTL));
xmlWriter.writeEndElement();
long consumerMaxRate = jmsConnectionFactory.getConfig().getConsumerMaxRate();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_MAX_RATE);
xmlWriter.writeCharacters(Long.toString(consumerMaxRate));
xmlWriter.writeEndElement();
long consumerWindowSize = jmsConnectionFactory.getConfig().getConsumerWindowSize();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_WINDOW_SIZE);
xmlWriter.writeCharacters(Long.toString(consumerWindowSize));
xmlWriter.writeEndElement();
String discoveryGroupName = jmsConnectionFactory.getConfig().getDiscoveryGroupName();
if (discoveryGroupName != null) {
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_DISCOVERY_GROUP_NAME);
xmlWriter.writeCharacters(discoveryGroupName);
xmlWriter.writeEndElement();
}
int dupsOKBatchSize = jmsConnectionFactory.getConfig().getDupsOKBatchSize();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_DUPS_OK_BATCH_SIZE);
xmlWriter.writeCharacters(Integer.toString(dupsOKBatchSize));
xmlWriter.writeEndElement();
JMSFactoryType factoryType = jmsConnectionFactory.getConfig().getFactoryType();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_TYPE);
xmlWriter.writeCharacters(Integer.toString(factoryType.intValue()));
xmlWriter.writeEndElement();
String groupID = jmsConnectionFactory.getConfig().getGroupID();
if (groupID != null) {
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_GROUP_ID);
xmlWriter.writeCharacters(groupID);
xmlWriter.writeEndElement();
}
String loadBalancingPolicyClassName = jmsConnectionFactory.getConfig().getLoadBalancingPolicyClassName();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_LOAD_BALANCING_POLICY_CLASS_NAME);
xmlWriter.writeCharacters(loadBalancingPolicyClassName);
xmlWriter.writeEndElement();
long maxRetryInterval = jmsConnectionFactory.getConfig().getMaxRetryInterval();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_MAX_RETRY_INTERVAL);
xmlWriter.writeCharacters(Long.toString(maxRetryInterval));
xmlWriter.writeEndElement();
long minLargeMessageSize = jmsConnectionFactory.getConfig().getMinLargeMessageSize();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_MIN_LARGE_MESSAGE_SIZE);
xmlWriter.writeCharacters(Long.toString(minLargeMessageSize));
xmlWriter.writeEndElement();
long producerMaxRate = jmsConnectionFactory.getConfig().getProducerMaxRate();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_MAX_RATE);
xmlWriter.writeCharacters(Long.toString(producerMaxRate));
xmlWriter.writeEndElement();
long producerWindowSize = jmsConnectionFactory.getConfig().getProducerWindowSize();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_WINDOW_SIZE);
xmlWriter.writeCharacters(Long.toString(producerWindowSize));
xmlWriter.writeEndElement();
long reconnectAttempts = jmsConnectionFactory.getConfig().getReconnectAttempts();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_RECONNECT_ATTEMPTS);
xmlWriter.writeCharacters(Long.toString(reconnectAttempts));
xmlWriter.writeEndElement();
long retryInterval = jmsConnectionFactory.getConfig().getRetryInterval();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL);
xmlWriter.writeCharacters(Long.toString(retryInterval));
xmlWriter.writeEndElement();
double retryIntervalMultiplier = jmsConnectionFactory.getConfig().getRetryIntervalMultiplier();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL_MULTIPLIER);
xmlWriter.writeCharacters(Double.toString(retryIntervalMultiplier));
xmlWriter.writeEndElement();
long scheduledThreadPoolMaxSize = jmsConnectionFactory.getConfig().getScheduledThreadPoolMaxSize();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_SCHEDULED_THREAD_POOL_MAX_SIZE);
xmlWriter.writeCharacters(Long.toString(scheduledThreadPoolMaxSize));
xmlWriter.writeEndElement();
long threadPoolMaxSize = jmsConnectionFactory.getConfig().getThreadPoolMaxSize();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_THREAD_POOL_MAX_SIZE);
xmlWriter.writeCharacters(Long.toString(threadPoolMaxSize));
xmlWriter.writeEndElement();
long transactionBatchSize = jmsConnectionFactory.getConfig().getTransactionBatchSize();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_TRANSACTION_BATCH_SIZE);
xmlWriter.writeCharacters(Long.toString(transactionBatchSize));
xmlWriter.writeEndElement();
boolean autoGroup = jmsConnectionFactory.getConfig().isAutoGroup();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_AUTO_GROUP);
xmlWriter.writeCharacters(Boolean.toString(autoGroup));
xmlWriter.writeEndElement();
boolean blockOnAcknowledge = jmsConnectionFactory.getConfig().isBlockOnAcknowledge();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_ACKNOWLEDGE);
xmlWriter.writeCharacters(Boolean.toString(blockOnAcknowledge));
xmlWriter.writeEndElement();
boolean blockOnDurableSend = jmsConnectionFactory.getConfig().isBlockOnDurableSend();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_DURABLE_SEND);
xmlWriter.writeCharacters(Boolean.toString(blockOnDurableSend));
xmlWriter.writeEndElement();
boolean blockOnNonDurableSend = jmsConnectionFactory.getConfig().isBlockOnNonDurableSend();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_NON_DURABLE_SEND);
xmlWriter.writeCharacters(Boolean.toString(blockOnNonDurableSend));
xmlWriter.writeEndElement();
boolean cacheLargeMessagesClient = jmsConnectionFactory.getConfig().isCacheLargeMessagesClient();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CACHE_LARGE_MESSAGES_CLIENT);
xmlWriter.writeCharacters(Boolean.toString(cacheLargeMessagesClient));
xmlWriter.writeEndElement();
boolean compressLargeMessages = jmsConnectionFactory.getConfig().isCompressLargeMessages();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_COMPRESS_LARGE_MESSAGES);
xmlWriter.writeCharacters(Boolean.toString(compressLargeMessages));
xmlWriter.writeEndElement();
boolean failoverOnInitialConnection = jmsConnectionFactory.getConfig().isFailoverOnInitialConnection();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_FAILOVER_ON_INITIAL_CONNECTION);
xmlWriter.writeCharacters(Boolean.toString(failoverOnInitialConnection));
xmlWriter.writeEndElement();
boolean ha = jmsConnectionFactory.getConfig().isHA();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_HA);
xmlWriter.writeCharacters(Boolean.toString(ha));
xmlWriter.writeEndElement();
boolean preAcknowledge = jmsConnectionFactory.getConfig().isPreAcknowledge();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_PREACKNOWLEDGE);
xmlWriter.writeCharacters(Boolean.toString(preAcknowledge));
xmlWriter.writeEndElement();
boolean useGlobalPools = jmsConnectionFactory.getConfig().isUseGlobalPools();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_USE_GLOBAL_POOLS);
xmlWriter.writeCharacters(Boolean.toString(useGlobalPools));
xmlWriter.writeEndElement();
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTORS);
for (String connector : jmsConnectionFactory.getConfig().getConnectorNames()) {
xmlWriter.writeStartElement(XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTOR);
xmlWriter.writeCharacters(connector);
xmlWriter.writeEndElement();
}
xmlWriter.writeEndElement();
xmlWriter.writeStartElement(XmlDataConstants.JMS_JNDI_ENTRIES);
PersistedBindings jndi = jmsJNDI.get(new Pair<>(PersistedType.ConnectionFactory, jmsConnectionFactory.getName()));
for (String jndiEntry : jndi.getBindings()) {
xmlWriter.writeStartElement(XmlDataConstants.JMS_JNDI_ENTRY);
xmlWriter.writeCharacters(jndiEntry);
xmlWriter.writeEndElement();
}
xmlWriter.writeEndElement(); // end jndi-entries
xmlWriter.writeEndElement(); // end JMS_CONNECTION_FACTORY
}
xmlWriter.writeEndElement();
}
private void printJmsDestinationsAsXML() throws XMLStreamException {
xmlWriter.writeStartElement(XmlDataConstants.JMS_DESTINATIONS);
for (Pair<PersistedType, String> jmsDestinationsKey : jmsDestinations.keySet()) {
PersistedDestination jmsDestination = jmsDestinations.get(jmsDestinationsKey);
xmlWriter.writeStartElement(XmlDataConstants.JMS_DESTINATION);
xmlWriter.writeStartElement(XmlDataConstants.JMS_DESTINATION_NAME);
xmlWriter.writeCharacters(jmsDestination.getName());
xmlWriter.writeEndElement();
String selector = jmsDestination.getSelector();
if (selector != null && selector.length() != 0) {
xmlWriter.writeStartElement(XmlDataConstants.JMS_DESTINATION_SELECTOR);
xmlWriter.writeCharacters(selector);
xmlWriter.writeEndElement();
}
xmlWriter.writeStartElement(XmlDataConstants.JMS_DESTINATION_TYPE);
xmlWriter.writeCharacters(jmsDestination.getType().toString());
xmlWriter.writeEndElement();
xmlWriter.writeStartElement(XmlDataConstants.JMS_JNDI_ENTRIES);
PersistedBindings jndi = jmsJNDI.get(new Pair<>(jmsDestination.getType(), jmsDestination.getName()));
for (String jndiEntry : jndi.getBindings()) {
xmlWriter.writeStartElement(XmlDataConstants.JMS_JNDI_ENTRY);
xmlWriter.writeCharacters(jndiEntry);
xmlWriter.writeEndElement();
}
xmlWriter.writeEndElement(); // end jndi-entries
xmlWriter.writeEndElement(); // end JMS_CONNECTION_FACTORY
}
xmlWriter.writeEndElement();
}
private void printAllMessagesAsXML() throws XMLStreamException {
xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT);
@ -822,7 +537,10 @@ public final class XmlDataExporter extends OptionalLocking {
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, value == null ? XmlDataConstants.NULL : value.toString());
}
if (value instanceof Boolean) {
// if the value is null then we can't really know what it is so just set the type to the most generic thing
if (value == null) {
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTES);
} else if (value instanceof Boolean) {
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BOOLEAN);
} else if (value instanceof Byte) {
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTE);

View File

@ -16,10 +16,15 @@
*/
package org.apache.activemq.artemis.cli.commands.tools;
import javax.xml.XMLConstants;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import javax.xml.transform.stax.StAXSource;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import javax.xml.validation.Validator;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
@ -27,10 +32,15 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import io.airlift.airline.Command;
@ -53,7 +63,10 @@ import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.ClassloadingUtil;
import org.apache.activemq.artemis.utils.ListUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
@ -75,7 +88,7 @@ public final class XmlDataImporter extends ActionAbstract {
// this session is really only needed if the "session" variable does not auto-commit sends
ClientSession managementSession;
boolean localSession;
boolean localSession = false;
final Map<String, String> addressMap = new HashMap<>();
@ -164,20 +177,21 @@ public final class XmlDataImporter extends ActionAbstract {
} else {
this.managementSession = session;
}
localSession = false;
processXml();
}
public void process(InputStream inputStream, String host, int port, boolean transactional) throws Exception {
reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
HashMap<String, Object> connectionParams = new HashMap<>();
connectionParams.put(TransportConstants.HOST_PROP_NAME, host);
connectionParams.put(TransportConstants.PORT_PROP_NAME, Integer.toString(port));
ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
ClientSessionFactory sf = serverLocator.createSessionFactory();
ClientSession session;
ClientSession managementSession;
if (user != null || password != null) {
session = sf.createSession(user, password, false, !transactional, true, false, 0);
managementSession = sf.createSession(user, password, false, true, true, false, 0);
@ -187,7 +201,30 @@ public final class XmlDataImporter extends ActionAbstract {
}
localSession = true;
processXml();
process(inputStream, session, managementSession);
}
public void validate(String file) throws Exception {
validate(new FileInputStream(file));
}
public void validate(InputStream inputStream) throws Exception {
XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
Schema schema = factory.newSchema(XmlDataImporter.findResource("schema/artemis-import-export.xsd"));
Validator validator = schema.newValidator();
validator.validate(new StAXSource(reader));
reader.close();
}
private static URL findResource(final String resourceName) {
return AccessController.doPrivileged(new PrivilegedAction<URL>() {
@Override
public URL run() {
return ClassloadingUtil.findResource(resourceName);
}
});
}
private void processXml() throws Exception {
@ -197,14 +234,12 @@ public final class XmlDataImporter extends ActionAbstract {
logger.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
}
if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) {
if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName())) {
if (XmlDataConstants.QUEUE_BINDINGS_CHILD.equals(reader.getLocalName())) {
bindQueue();
} else if (XmlDataConstants.ADDRESS_BINDINGS_CHILD.equals(reader.getLocalName())) {
bindAddress();
} else if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) {
processMessage();
} else if (XmlDataConstants.JMS_CONNECTION_FACTORIES.equals(reader.getLocalName())) {
createJmsConnectionFactories();
} else if (XmlDataConstants.JMS_DESTINATIONS.equals(reader.getLocalName())) {
createJmsDestinations();
}
}
reader.next();
@ -396,6 +431,10 @@ public final class XmlDataImporter extends ActionAbstract {
}
}
if (value.equals(XmlDataConstants.NULL)) {
value = null;
}
switch (propertyType) {
case XmlDataConstants.PROPERTY_TYPE_SHORT:
message.putShortProperty(key, Short.parseShort(value));
@ -407,7 +446,7 @@ public final class XmlDataImporter extends ActionAbstract {
message.putByteProperty(key, Byte.parseByte(value));
break;
case XmlDataConstants.PROPERTY_TYPE_BYTES:
message.putBytesProperty(key, decode(value));
message.putBytesProperty(key, value == null ? null : decode(value));
break;
case XmlDataConstants.PROPERTY_TYPE_DOUBLE:
message.putDoubleProperty(key, Double.parseDouble(value));
@ -422,16 +461,10 @@ public final class XmlDataImporter extends ActionAbstract {
message.putLongProperty(key, Long.parseLong(value));
break;
case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING:
if (!value.equals(XmlDataConstants.NULL)) {
realSimpleStringValue = new SimpleString(value);
}
message.putStringProperty(new SimpleString(key), realSimpleStringValue);
message.putStringProperty(new SimpleString(key), value == null ? null : SimpleString.toSimpleString(value));
break;
case XmlDataConstants.PROPERTY_TYPE_STRING:
if (!value.equals(XmlDataConstants.NULL)) {
realStringValue = value;
}
message.putStringProperty(key, realStringValue);
message.putStringProperty(key, value);
break;
}
}
@ -509,26 +542,30 @@ public final class XmlDataImporter extends ActionAbstract {
String queueName = "";
String address = "";
String filter = "";
String routingType = "";
for (int i = 0; i < reader.getAttributeCount(); i++) {
String attributeName = reader.getAttributeLocalName(i);
switch (attributeName) {
case XmlDataConstants.BINDING_ADDRESS:
case XmlDataConstants.QUEUE_BINDING_ADDRESS:
address = reader.getAttributeValue(i);
break;
case XmlDataConstants.BINDING_QUEUE_NAME:
case XmlDataConstants.QUEUE_BINDING_NAME:
queueName = reader.getAttributeValue(i);
break;
case XmlDataConstants.BINDING_FILTER_STRING:
case XmlDataConstants.QUEUE_BINDING_FILTER_STRING:
filter = reader.getAttributeValue(i);
break;
case XmlDataConstants.QUEUE_BINDING_ROUTING_TYPE:
routingType = reader.getAttributeValue(i);
break;
}
}
ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));
if (!queueQuery.isExists()) {
session.createQueue(address, queueName, filter, true);
session.createQueue(address, RoutingType.valueOf(routingType), queueName, filter, true);
if (logger.isDebugEnabled()) {
logger.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
}
@ -541,350 +578,36 @@ public final class XmlDataImporter extends ActionAbstract {
addressMap.put(queueName, address);
}
private void createJmsConnectionFactories() throws Exception {
boolean endLoop = false;
private void bindAddress() throws Exception {
String addressName = "";
String routingTypes = "";
while (reader.hasNext()) {
int eventType = reader.getEventType();
switch (eventType) {
case XMLStreamConstants.START_ELEMENT:
if (XmlDataConstants.JMS_CONNECTION_FACTORY.equals(reader.getLocalName())) {
createJmsConnectionFactory();
}
for (int i = 0; i < reader.getAttributeCount(); i++) {
String attributeName = reader.getAttributeLocalName(i);
switch (attributeName) {
case XmlDataConstants.ADDRESS_BINDING_NAME:
addressName = reader.getAttributeValue(i);
break;
case XMLStreamConstants.END_ELEMENT:
if (XmlDataConstants.JMS_CONNECTION_FACTORIES.equals(reader.getLocalName())) {
endLoop = true;
}
case XmlDataConstants.ADDRESS_BINDING_ROUTING_TYPE:
routingTypes = reader.getAttributeValue(i);
break;
}
if (endLoop) {
break;
}
reader.next();
}
}
private void createJmsDestinations() throws Exception {
boolean endLoop = false;
while (reader.hasNext()) {
int eventType = reader.getEventType();
switch (eventType) {
case XMLStreamConstants.START_ELEMENT:
if (XmlDataConstants.JMS_DESTINATION.equals(reader.getLocalName())) {
createJmsDestination();
}
break;
case XMLStreamConstants.END_ELEMENT:
if (XmlDataConstants.JMS_DESTINATIONS.equals(reader.getLocalName())) {
endLoop = true;
}
break;
}
if (endLoop) {
break;
}
reader.next();
}
}
private void createJmsConnectionFactory() throws Exception {
String name = "";
String callFailoverTimeout = "";
String callTimeout = "";
String clientFailureCheckPeriod = "";
String clientId = "";
String confirmationWindowSize = "";
String connectionTtl = "";
String connectors = "";
String consumerMaxRate = "";
String consumerWindowSize = "";
String discoveryGroupName = "";
String dupsOkBatchSize = "";
String groupId = "";
String loadBalancingPolicyClassName = "";
String maxRetryInterval = "";
String minLargeMessageSize = "";
String producerMaxRate = "";
String producerWindowSize = "";
String reconnectAttempts = "";
String retryInterval = "";
String retryIntervalMultiplier = "";
String scheduledThreadMaxPoolSize = "";
String threadMaxPoolSize = "";
String transactionBatchSize = "";
String type = "";
String entries = "";
String autoGroup = "";
String blockOnAcknowledge = "";
String blockOnDurableSend = "";
String blockOnNonDurableSend = "";
String cacheLargeMessagesClient = "";
String compressLargeMessages = "";
String failoverOnInitialConnection = "";
String ha = "";
String preacknowledge = "";
String useGlobalPools = "";
boolean endLoop = false;
while (reader.hasNext()) {
int eventType = reader.getEventType();
switch (eventType) {
case XMLStreamConstants.START_ELEMENT:
if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_FAILOVER_TIMEOUT.equals(reader.getLocalName())) {
callFailoverTimeout = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory callFailoverTimeout: " + callFailoverTimeout);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_TIMEOUT.equals(reader.getLocalName())) {
callTimeout = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory callTimeout: " + callTimeout);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_FAILURE_CHECK_PERIOD.equals(reader.getLocalName())) {
clientFailureCheckPeriod = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory clientFailureCheckPeriod: " + clientFailureCheckPeriod);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_ID.equals(reader.getLocalName())) {
clientId = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory clientId: " + clientId);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONFIRMATION_WINDOW_SIZE.equals(reader.getLocalName())) {
confirmationWindowSize = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory confirmationWindowSize: " + confirmationWindowSize);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTION_TTL.equals(reader.getLocalName())) {
connectionTtl = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory connectionTtl: " + connectionTtl);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTOR.equals(reader.getLocalName())) {
connectors = getConnectors();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory getLocalName: " + connectors);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_MAX_RATE.equals(reader.getLocalName())) {
consumerMaxRate = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory consumerMaxRate: " + consumerMaxRate);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_WINDOW_SIZE.equals(reader.getLocalName())) {
consumerWindowSize = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory consumerWindowSize: " + consumerWindowSize);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DISCOVERY_GROUP_NAME.equals(reader.getLocalName())) {
discoveryGroupName = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory discoveryGroupName: " + discoveryGroupName);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DUPS_OK_BATCH_SIZE.equals(reader.getLocalName())) {
dupsOkBatchSize = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory dupsOkBatchSize: " + dupsOkBatchSize);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_GROUP_ID.equals(reader.getLocalName())) {
groupId = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory groupId: " + groupId);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_LOAD_BALANCING_POLICY_CLASS_NAME.equals(reader.getLocalName())) {
loadBalancingPolicyClassName = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory loadBalancingPolicyClassName: " + loadBalancingPolicyClassName);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MAX_RETRY_INTERVAL.equals(reader.getLocalName())) {
maxRetryInterval = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory maxRetryInterval: " + maxRetryInterval);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MIN_LARGE_MESSAGE_SIZE.equals(reader.getLocalName())) {
minLargeMessageSize = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory minLargeMessageSize: " + minLargeMessageSize);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_NAME.equals(reader.getLocalName())) {
name = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory name: " + name);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_MAX_RATE.equals(reader.getLocalName())) {
producerMaxRate = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory producerMaxRate: " + producerMaxRate);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_WINDOW_SIZE.equals(reader.getLocalName())) {
producerWindowSize = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory producerWindowSize: " + producerWindowSize);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RECONNECT_ATTEMPTS.equals(reader.getLocalName())) {
reconnectAttempts = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory reconnectAttempts: " + reconnectAttempts);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL.equals(reader.getLocalName())) {
retryInterval = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory retryInterval: " + retryInterval);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL_MULTIPLIER.equals(reader.getLocalName())) {
retryIntervalMultiplier = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory retryIntervalMultiplier: " + retryIntervalMultiplier);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_SCHEDULED_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName())) {
scheduledThreadMaxPoolSize = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory scheduledThreadMaxPoolSize: " + scheduledThreadMaxPoolSize);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName())) {
threadMaxPoolSize = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory threadMaxPoolSize: " + threadMaxPoolSize);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TRANSACTION_BATCH_SIZE.equals(reader.getLocalName())) {
transactionBatchSize = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory transactionBatchSize: " + transactionBatchSize);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TYPE.equals(reader.getLocalName())) {
type = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory type: " + type);
}
} else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) {
entries = getEntries();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory entries: " + entries);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_AUTO_GROUP.equals(reader.getLocalName())) {
autoGroup = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory autoGroup: " + autoGroup);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_ACKNOWLEDGE.equals(reader.getLocalName())) {
blockOnAcknowledge = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory blockOnAcknowledge: " + blockOnAcknowledge);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_DURABLE_SEND.equals(reader.getLocalName())) {
blockOnDurableSend = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory blockOnDurableSend: " + blockOnDurableSend);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_NON_DURABLE_SEND.equals(reader.getLocalName())) {
blockOnNonDurableSend = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory blockOnNonDurableSend: " + blockOnNonDurableSend);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CACHE_LARGE_MESSAGES_CLIENT.equals(reader.getLocalName())) {
cacheLargeMessagesClient = reader.getElementText();
ActiveMQServerLogger.LOGGER.info("JMS connection factory " + name + " cacheLargeMessagesClient: " + cacheLargeMessagesClient);
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_COMPRESS_LARGE_MESSAGES.equals(reader.getLocalName())) {
compressLargeMessages = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory compressLargeMessages: " + compressLargeMessages);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_FAILOVER_ON_INITIAL_CONNECTION.equals(reader.getLocalName())) {
failoverOnInitialConnection = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory failoverOnInitialConnection: " + failoverOnInitialConnection);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_HA.equals(reader.getLocalName())) {
ha = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory ha: " + ha);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PREACKNOWLEDGE.equals(reader.getLocalName())) {
preacknowledge = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory preacknowledge: " + preacknowledge);
}
} else if (XmlDataConstants.JMS_CONNECTION_FACTORY_USE_GLOBAL_POOLS.equals(reader.getLocalName())) {
useGlobalPools = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory useGlobalPools: " + useGlobalPools);
}
}
break;
case XMLStreamConstants.END_ELEMENT:
if (XmlDataConstants.JMS_CONNECTION_FACTORY.equals(reader.getLocalName())) {
endLoop = true;
}
break;
}
if (endLoop) {
break;
}
reader.next();
}
ActiveMQServerLogger.LOGGER.error("Ignoring Connection Factory " + name);
}
ClientSession.AddressQuery addressQuery = session.addressQuery(new SimpleString(addressName));
private void createJmsDestination() throws Exception {
String name = "";
String selector = "";
String entries = "";
String type = "";
boolean endLoop = false;
while (reader.hasNext()) {
int eventType = reader.getEventType();
switch (eventType) {
case XMLStreamConstants.START_ELEMENT:
if (XmlDataConstants.JMS_DESTINATION_NAME.equals(reader.getLocalName())) {
name = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS destination name: " + name);
}
} else if (XmlDataConstants.JMS_DESTINATION_SELECTOR.equals(reader.getLocalName())) {
selector = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS destination selector: " + selector);
}
} else if (XmlDataConstants.JMS_DESTINATION_TYPE.equals(reader.getLocalName())) {
type = reader.getElementText();
if (logger.isDebugEnabled()) {
logger.debug("JMS destination type: " + type);
}
} else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) {
entries = getEntries();
}
break;
case XMLStreamConstants.END_ELEMENT:
if (XmlDataConstants.JMS_DESTINATION.equals(reader.getLocalName())) {
endLoop = true;
}
break;
if (!addressQuery.isExists()) {
Set<RoutingType> set = new HashSet<>();
for (String routingType : ListUtil.toList(routingTypes)) {
set.add(RoutingType.valueOf(routingType));
}
if (endLoop) {
break;
session.createAddress(SimpleString.toSimpleString(addressName), set, false);
if (logger.isDebugEnabled()) {
logger.debug("Binding address(name=" + addressName + ", routingTypes=" + routingTypes + ")");
}
reader.next();
}
try (ClientRequestor requestor = new ClientRequestor(managementSession, "activemq.management")) {
ClientMessage managementMessage = managementSession.createMessage(false);
if ("Queue".equals(type)) {
ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.BROKER, "createQueue", name, entries, selector);
} else if ("Topic".equals(type)) {
ManagementHelper.putOperationInvocation(managementMessage, ResourceNames.BROKER, "createAddress", name, entries);
}
managementSession.start();
ClientMessage reply = requestor.request(managementMessage);
if (ManagementHelper.hasOperationSucceeded(reply)) {
if (logger.isDebugEnabled()) {
logger.debug("Created " + type.toLowerCase() + " " + name);
}
} else {
ActiveMQServerLogger.LOGGER.error("Problem creating " + name);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Binding " + addressName + " already exists so won't re-bind.");
}
}
}

View File

@ -0,0 +1,248 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
attributeFormDefault="unqualified"
elementFormDefault="qualified"
version="1.0">
<xsd:element name="activemq-journal" type="activemq-journalType"/>
<xsd:complexType name="address-bindingType">
<xsd:simpleContent>
<xsd:extension base="xsd:string">
<xsd:attribute type="xsd:string" name="routing-types" use="required">
<xsd:annotation>
<xsd:documentation>
the routing types supported by the address; valid values: MULTICAST, ANYCAST
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute type="xsd:string" name="name" use="required">
<xsd:annotation>
<xsd:documentation>
the name of the address binding
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute type="xsd:long" name="id" use="optional">
<xsd:annotation>
<xsd:documentation>
the id of the address binding
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:extension>
</xsd:simpleContent>
</xsd:complexType>
<xsd:complexType name="queue-bindingType">
<xsd:simpleContent>
<xsd:extension base="xsd:string">
<xsd:attribute type="xsd:string" name="address" use="required">
<xsd:annotation>
<xsd:documentation>
the address name of the queue binding
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute type="xsd:string" name="filter-string" use="optional">
<xsd:annotation>
<xsd:documentation>
the binding's filter (i.e. if using JMS selector syntax see
org.apache.activemq.artemis.utils.SelectorTranslator.convertToActiveMQFilterString for
conversion semantics)
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute type="xsd:string" name="name" use="required">
<xsd:annotation>
<xsd:documentation>
the queue name of the binding
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute type="xsd:long" name="id" use="optional">
<xsd:annotation>
<xsd:documentation>
the binding's identifier
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute type="xsd:string" name="routing-type" use="required">
<xsd:annotation>
<xsd:documentation>
the binding's routing type
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:extension>
</xsd:simpleContent>
</xsd:complexType>
<xsd:complexType name="bindingsType">
<xsd:sequence>
<xsd:element type="address-bindingType" name="address-binding" maxOccurs="unbounded" minOccurs="0"/>
<xsd:element type="queue-bindingType" name="queue-binding" maxOccurs="unbounded" minOccurs="0"/>
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="propertyType">
<xsd:simpleContent>
<xsd:extension base="xsd:string">
<xsd:attribute type="xsd:string" name="name" use="required">
<xsd:annotation>
<xsd:documentation>
the property's name
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute type="xsd:string" name="value" use="required">
<xsd:annotation>
<xsd:documentation>
the property's value; byte arrays are Base64 encoded the same way as the message's body
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute type="xsd:string" name="type" use="optional">
<xsd:annotation>
<xsd:documentation>
the property's type; valid values: boolean, byte, bytes, short, integer, long, float, double,
string, simple-string
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:extension>
</xsd:simpleContent>
</xsd:complexType>
<xsd:complexType name="propertiesType">
<xsd:sequence>
<xsd:element type="propertyType" name="property" maxOccurs="unbounded" minOccurs="0"/>
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="queueType">
<xsd:simpleContent>
<xsd:extension base="xsd:string">
<xsd:attribute type="xsd:string" name="name">
<xsd:annotation>
<xsd:documentation>
the queue's name
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:extension>
</xsd:simpleContent>
</xsd:complexType>
<xsd:complexType name="queuesType">
<xsd:sequence>
<xsd:element type="queueType" name="queue"/>
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="bodyType">
<xsd:simpleContent>
<xsd:extension base="xsd:string">
<xsd:attribute type="xsd:boolean" name="isLarge" use="optional"/>
</xsd:extension>
</xsd:simpleContent>
</xsd:complexType>
<xsd:complexType name="messageType">
<xsd:sequence>
<xsd:element type="propertiesType" name="properties">
<xsd:annotation>
<xsd:documentation>
the message's properties
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element type="queuesType" name="queues" maxOccurs="1" minOccurs="1">
<xsd:annotation>
<xsd:documentation>
a list of queues that hold a reference to this message
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element type="bodyType" name="body">
<xsd:annotation>
<xsd:documentation>
the body of the message (Base64 encoded using the Base64.DONT_BREAK_LINES and Base64.URL_SAFE options;
see org.apache.activemq.artemis.utils.Base64.encodeBytes(byte[], int, int, int)); stored in a CDATA
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
<xsd:attribute type="xsd:long" name="id" use="optional">
<xsd:annotation>
<xsd:documentation>
the queue's identifier
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute type="xsd:byte" name="priority" use="optional" default="0">
<xsd:annotation>
<xsd:documentation>
the priority of the message (between 0-9 inclusive)
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute type="xsd:long" name="expiration" use="optional" default="0">
<xsd:annotation>
<xsd:documentation>
when this message will expire (epoch time value, 0 for never)
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute type="xsd:long" name="timestamp" use="optional" default="0">
<xsd:annotation>
<xsd:documentation>
when this message was sent originally (epoch time value)
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute type="xsd:string" name="type" use="optional" default="DEFAULT">
<xsd:annotation>
<xsd:documentation>
the message's type; valid values: DEFAULT, OBJECT, TEXT, BYTES, MAP, STREAM
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute type="xsd:string" name="user-id" use="optional" default="">
<xsd:annotation>
<xsd:documentation>
the id of the user who sent the message
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="messagesType">
<xsd:sequence>
<xsd:element type="messageType" name="message" minOccurs="0" maxOccurs="unbounded"/>
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="activemq-journalType">
<xsd:sequence>
<xsd:element type="bindingsType" name="bindings">
<xsd:annotation>
<xsd:documentation>
a list of exported bindings
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element type="messagesType" name="messages">
<xsd:annotation>
<xsd:documentation>
a list of exported messages
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
</xsd:complexType>
</xsd:schema>

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.ArrayList;
import java.util.List;
public class ListUtil {
public static List<String> toList(final String commaSeparatedString) {
List<String> list = new ArrayList<>();
if (commaSeparatedString == null || commaSeparatedString.trim().length() == 0) {
return list;
}
String[] values = commaSeparatedString.split(",");
for (String value : values) {
list.add(value.trim());
}
return list;
}
}

View File

@ -104,6 +104,7 @@ import org.apache.activemq.artemis.core.transaction.impl.CoreTransactionDetail;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.ListUtil;
import org.apache.activemq.artemis.utils.SecurityFormatter;
import org.apache.activemq.artemis.utils.TypedProperties;
@ -622,7 +623,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
Set<RoutingType> set = new HashSet<>();
for (String routingType : toList(routingTypes)) {
for (String routingType : ListUtil.toList(routingTypes)) {
set.add(RoutingType.valueOf(routingType));
}
final AddressInfo addressInfo = new AddressInfo(new SimpleString(name), set);
@ -2095,7 +2096,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
if (useDiscoveryGroup) {
config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);
} else {
config.setStaticConnectors(toList(staticConnectorsOrDiscoveryGroup));
config.setStaticConnectors(ListUtil.toList(staticConnectorsOrDiscoveryGroup));
}
server.deployBridge(config);
@ -2132,7 +2133,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
if (useDiscoveryGroup) {
config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);
} else {
config.setStaticConnectors(toList(staticConnectorsOrDiscoveryGroup));
config.setStaticConnectors(ListUtil.toList(staticConnectorsOrDiscoveryGroup));
}
server.deployBridge(config);
@ -2440,18 +2441,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
return new String[0];
}
private static List<String> toList(final String commaSeparatedString) {
List<String> list = new ArrayList<>();
if (commaSeparatedString == null || commaSeparatedString.trim().length() == 0) {
return list;
}
String[] values = commaSeparatedString.split(",");
for (String value : values) {
list.add(value.trim());
}
return list;
}
@Override
public void onNotification(org.apache.activemq.artemis.core.server.management.Notification notification) {
if (!(notification.getType() instanceof CoreNotificationType))

View File

@ -234,6 +234,9 @@ public class FileLockNodeManager extends NodeManager {
ByteBuffer bb = ByteBuffer.allocateDirect(1);
bb.put(status);
bb.position(0);
if (!channel.isOpen()) {
setUpServerLockFile();
}
channel.write(bb, 0);
channel.force(true);
}

View File

@ -27,6 +27,8 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.Message;
@ -47,6 +49,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageM
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
@ -96,7 +99,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
public void testMessageProperties() throws Exception {
ClientSession session = basicSetUp();
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true);
ClientProducer producer = session.createProducer(QUEUE_NAME);
@ -147,6 +150,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session);
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
@ -167,14 +172,17 @@ public class XmlImportExportTest extends ActiveMQTestBase {
assertEquals(i, msg.getIntProperty("myIntProperty").intValue());
assertEquals(Long.MAX_VALUE - i, msg.getLongProperty("myLongProperty").longValue());
assertEquals(i, msg.getObjectProperty("myObjectProperty"));
assertEquals(true, msg.getPropertyNames().contains(SimpleString.toSimpleString("myNullObjectProperty")));
assertEquals(null, msg.getObjectProperty("myNullObjectProperty"));
assertEquals(new Integer(i).shortValue(), msg.getShortProperty("myShortProperty").shortValue());
assertEquals("myStringPropertyValue_" + i, msg.getStringProperty("myStringProperty"));
assertEquals(true, msg.getPropertyNames().contains(SimpleString.toSimpleString("myNullStringProperty")));
assertEquals(null, msg.getStringProperty("myNullStringProperty"));
assertEquals(international.toString(), msg.getStringProperty("myNonAsciiStringProperty"));
assertEquals(special, msg.getStringProperty("mySpecialCharacters"));
assertEquals(new SimpleString("mySimpleStringPropertyValue_" + i), msg.getSimpleStringProperty(new SimpleString("mySimpleStringProperty")));
assertEquals(null, msg.getSimpleStringProperty(new SimpleString("myNullSimpleStringProperty")));
assertEquals(true, msg.getPropertyNames().contains(SimpleString.toSimpleString("myNullSimpleStringProperty")));
assertEquals(null, msg.getSimpleStringProperty("myNullSimpleStringProperty"));
}
}
@ -202,7 +210,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ClientSession session = basicSetUp();
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true);
ClientProducer producer = session.createProducer(QUEUE_NAME);
@ -239,6 +247,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session);
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
@ -268,7 +278,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ClientSession session = basicSetUp();
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true);
ClientProducer producer = session.createProducer(QUEUE_NAME);
ClientMessage msg = session.createMessage(Message.TEXT_TYPE, true);
@ -293,6 +303,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session);
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
@ -311,7 +323,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ClientSession session = basicSetUp();
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true);
ClientProducer producer = session.createProducer(QUEUE_NAME);
ClientMessage msg = session.createMessage(Message.BYTES_TYPE, true);
@ -336,6 +348,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session);
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
@ -352,7 +366,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ClientSession session = basicSetUp();
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true);
ClientProducer producer = session.createProducer(QUEUE_NAME);
@ -381,6 +395,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session);
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
@ -396,8 +412,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
public void testBindingAttributes() throws Exception {
ClientSession session = basicSetUp();
session.createQueue("addressName1", "queueName1", true);
session.createQueue("addressName1", "queueName2", "bob", true);
session.createQueue("addressName1", RoutingType.MULTICAST, "queueName1", true);
session.createQueue("addressName1", RoutingType.MULTICAST, "queueName2", "bob", true);
session.close();
locator.close();
@ -417,6 +433,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session);
ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString("queueName1"));
@ -452,7 +470,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
fileMessage.releaseResources();
session.createQueue("A", "A", true);
session.createQueue("A", RoutingType.MULTICAST, "A", true);
ClientProducer prod = session.createProducer("A");
@ -480,6 +498,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session);
session.close();
session = factory.createSession(false, false);
@ -507,6 +527,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory("vm://0", "test");
Connection c = cf.createConnection();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
server.createQueue(SimpleString.toSimpleString("A"), RoutingType.ANYCAST, SimpleString.toSimpleString("A"), null, true, false);
MessageProducer p = s.createProducer(ActiveMQJMSClient.createQueue("A"));
p.setDeliveryMode(DeliveryMode.PERSISTENT);
StringBuilder stringBuilder = new StringBuilder();
@ -533,9 +554,11 @@ public class XmlImportExportTest extends ActiveMQTestBase {
factory = createSessionFactory(locator);
ClientSession session = factory.createSession(false, true, true);
ByteArrayInputStream inputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(inputStream, session);
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session);
session.close();
c = cf.createConnection();
@ -553,8 +576,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
public void testPartialQueue() throws Exception {
ClientSession session = basicSetUp();
session.createQueue("myAddress", "myQueue1", true);
session.createQueue("myAddress", "myQueue2", true);
session.createQueue("myAddress", RoutingType.MULTICAST, "myQueue1", true);
session.createQueue("myAddress", RoutingType.MULTICAST, "myQueue2", true);
ClientProducer producer = session.createProducer("myAddress");
@ -586,6 +609,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session);
consumer = session.createConsumer("myQueue1");
session.start();
@ -618,8 +643,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, true, true);
session.createQueue(MY_ADDRESS, MY_QUEUE, true);
session.createQueue(MY_ADDRESS, MY_QUEUE2, true);
session.createQueue(MY_ADDRESS, RoutingType.MULTICAST, MY_QUEUE, true);
session.createQueue(MY_ADDRESS, RoutingType.MULTICAST, MY_QUEUE2, true);
ClientProducer producer = session.createProducer(MY_ADDRESS);
@ -650,6 +675,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session);
ClientConsumer consumer = session.createConsumer(MY_QUEUE);
@ -686,7 +713,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
factory = createSessionFactory(locator);
ClientSession session = factory.createSession(false, true, true);
session.createQueue(MY_ADDRESS, MY_QUEUE, true);
session.createQueue(MY_ADDRESS, RoutingType.MULTICAST, MY_QUEUE, true);
ClientProducer producer = session.createProducer(MY_ADDRESS);
@ -715,6 +742,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session);
ClientConsumer consumer = session.createConsumer(MY_QUEUE);
@ -747,7 +776,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, true, true);
session.createQueue(MY_ADDRESS, MY_QUEUE, true);
session.createQueue(MY_ADDRESS, RoutingType.MULTICAST, MY_QUEUE, true);
ClientProducer producer = session.createProducer(MY_ADDRESS);
@ -793,6 +822,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session);
ClientConsumer consumer = session.createConsumer(MY_QUEUE);
@ -824,7 +855,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
public void testTransactional() throws Exception {
ClientSession session = basicSetUp();
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true);
ClientProducer producer = session.createProducer(QUEUE_NAME);
@ -850,6 +881,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session, managementSession);
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
@ -867,7 +900,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, true, true);
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true);
ClientProducer producer = session.createProducer(QUEUE_NAME);
@ -894,6 +927,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session, managementSession);
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
@ -916,7 +951,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, true, true);
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
session.createQueue(QUEUE_NAME, RoutingType.MULTICAST, QUEUE_NAME, true);
ClientProducer producer = session.createProducer(QUEUE_NAME);
@ -948,6 +983,8 @@ public class XmlImportExportTest extends ActiveMQTestBase {
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session, managementSession);
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
@ -963,4 +1000,44 @@ public class XmlImportExportTest extends ActiveMQTestBase {
locator.close();
server.stop();
}
@Test
public void testRoutingTypes() throws Exception {
SimpleString myAddress = SimpleString.toSimpleString("myAddress");
ClientSession session = basicSetUp();
Set<RoutingType> routingTypes = new HashSet<>();
routingTypes.add(RoutingType.ANYCAST);
routingTypes.add(RoutingType.MULTICAST);
session.createAddress(myAddress, routingTypes, false);
session.createQueue(myAddress.toString(), RoutingType.MULTICAST, "myQueue1", true);
session.createQueue(myAddress.toString(), RoutingType.MULTICAST, "myQueue2", true);
locator.close();
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
server.start();
checkForLongs();
locator = createInVMNonHALocator();
factory = locator.createSessionFactory();
session = factory.createSession(false, false, true);
ClientSession managementSession = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.validate(xmlInputStream);
xmlInputStream.reset();
xmlDataImporter.process(xmlInputStream, session, managementSession);
assertTrue(server.getAddressInfo(myAddress).getRoutingTypes().contains(RoutingType.ANYCAST));
assertTrue(server.getAddressInfo(myAddress).getRoutingTypes().contains(RoutingType.MULTICAST));
}
}