This closes #791

This commit is contained in:
Clebert Suconic 2016-09-23 12:15:13 -04:00
commit 67ce44fc0d
3 changed files with 352 additions and 65 deletions

View File

@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
/** /**
* Read XML output from <code>org.apache.activemq.artemis.core.persistence.impl.journal.XmlDataExporter</code>, create a core session, and * Read XML output from <code>org.apache.activemq.artemis.core.persistence.impl.journal.XmlDataExporter</code>, create a core session, and
@ -65,6 +66,8 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
public final class XmlDataImporter extends ActionAbstract { public final class XmlDataImporter extends ActionAbstract {
// Constants ----------------------------------------------------- // Constants -----------------------------------------------------
private static final Logger logger = Logger.getLogger(XmlDataImporter.class);
// Attributes ---------------------------------------------------- // Attributes ----------------------------------------------------
private XMLStreamReader reader; private XMLStreamReader reader;
@ -192,7 +195,9 @@ public final class XmlDataImporter extends ActionAbstract {
private void processXml() throws Exception { private void processXml() throws Exception {
try { try {
while (reader.hasNext()) { while (reader.hasNext()) {
ActiveMQServerLogger.LOGGER.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] "); if (logger.isDebugEnabled()) {
logger.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
}
if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) { if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) {
if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName())) { if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName())) {
bindQueue(); bindQueue();
@ -334,12 +339,16 @@ public final class XmlDataImporter extends ActionAbstract {
ClientMessage managementMessage = managementSession.createMessage(false); ClientMessage managementMessage = managementSession.createMessage(false);
ManagementHelper.putAttribute(managementMessage, "core.queue." + queue, "ID"); ManagementHelper.putAttribute(managementMessage, "core.queue." + queue, "ID");
managementSession.start(); managementSession.start();
ActiveMQServerLogger.LOGGER.debug("Requesting ID for: " + queue); if (logger.isDebugEnabled()) {
logger.debug("Requesting ID for: " + queue);
}
ClientMessage reply = requestor.request(managementMessage); ClientMessage reply = requestor.request(managementMessage);
Number idObject = (Number) ManagementHelper.getResult(reply); Number idObject = (Number) ManagementHelper.getResult(reply);
queueID = idObject.longValue(); queueID = idObject.longValue();
requestor.close(); requestor.close();
ActiveMQServerLogger.LOGGER.debug("ID for " + queue + " is: " + queueID); if (logger.isDebugEnabled()) {
logger.debug("ID for " + queue + " is: " + queueID);
}
queueIDs.put(queue, queueID); // store it so we don't have to look it up every time queueIDs.put(queue, queueID); // store it so we don't have to look it up every time
} }
@ -348,7 +357,9 @@ public final class XmlDataImporter extends ActionAbstract {
} }
logMessage.delete(logMessage.length() - 2, logMessage.length()); // take off the trailing comma logMessage.delete(logMessage.length() - 2, logMessage.length()); // take off the trailing comma
ActiveMQServerLogger.LOGGER.debug(logMessage); if (logger.isDebugEnabled()) {
logger.debug(logMessage);
}
message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array()); message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
try (ClientProducer producer = session.createProducer(destination)) { try (ClientProducer producer = session.createProducer(destination)) {
@ -434,7 +445,7 @@ public final class XmlDataImporter extends ActionAbstract {
} }
} }
private void processMessageBody(Message message) throws XMLStreamException, IOException { private void processMessageBody(final Message message) throws XMLStreamException, IOException {
boolean isLarge = false; boolean isLarge = false;
for (int i = 0; i < reader.getAttributeCount(); i++) { for (int i = 0; i < reader.getAttributeCount(); i++) {
@ -444,33 +455,65 @@ public final class XmlDataImporter extends ActionAbstract {
} }
} }
reader.next(); reader.next();
if (logger.isDebugEnabled()) {
logger.debug("XMLStreamReader impl: " + reader);
}
if (isLarge) { if (isLarge) {
tempFileName = UUID.randomUUID().toString() + ".tmp"; tempFileName = UUID.randomUUID().toString() + ".tmp";
ActiveMQServerLogger.LOGGER.debug("Creating temp file " + tempFileName + " for large message."); if (logger.isDebugEnabled()) {
logger.debug("Creating temp file " + tempFileName + " for large message.");
}
try (OutputStream out = new FileOutputStream(tempFileName)) { try (OutputStream out = new FileOutputStream(tempFileName)) {
while (reader.hasNext()) { getMessageBodyBytes(new MessageBodyBytesProcessor() {
if (reader.getEventType() == XMLStreamConstants.END_ELEMENT) { @Override
break; public void processBodyBytes(byte[] bytes) throws IOException {
} out.write(bytes);
else {
String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
String trimmedCharacters = characters.trim();
if (trimmedCharacters.length() > 0) { // this will skip "indentation" characters
byte[] data = decode(trimmedCharacters);
out.write(data);
}
}
reader.next();
} }
});
} }
FileInputStream fileInputStream = new FileInputStream(tempFileName); FileInputStream fileInputStream = new FileInputStream(tempFileName);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream); BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
((ClientMessage) message).setBodyInputStream(bufferedInput); ((ClientMessage) message).setBodyInputStream(bufferedInput);
} }
else { else {
reader.next(); // step past the "indentation" characters to get to the CDATA with the message body getMessageBodyBytes(new MessageBodyBytesProcessor() {
String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()); @Override
message.getBodyBuffer().writeBytes(decode(characters.trim())); public void processBodyBytes(byte[] bytes) throws IOException {
message.getBodyBuffer().writeBytes(bytes);
}
});
}
}
/**
* Message bodies are written to XML as one or more Base64 encoded CDATA elements. Some parser implementations won't
* read an entire CDATA element at once (e.g. Woodstox) so it's possible that multiple CDATA/CHARACTERS events need
* to be combined to reconstruct the Base64 encoded string. You can't decode bits and pieces of each CDATA. Each
* CDATA has to be decoded in its entirety.
*
* @param processor used to deal with the decoded CDATA elements
* @throws IOException
* @throws XMLStreamException
*/
private void getMessageBodyBytes(MessageBodyBytesProcessor processor) throws IOException, XMLStreamException {
int currentEventType;
StringBuilder cdata = new StringBuilder();
while (reader.hasNext()) {
currentEventType = reader.getEventType();
if (currentEventType == XMLStreamConstants.END_ELEMENT) {
break;
}
/* when we hit a whitespace CHARACTERS event we know that the entire CDATA is complete so decode, pass back to
* the processor, and reset the cdata for the next event(s)
*/
else if (currentEventType == XMLStreamConstants.CHARACTERS && reader.isWhiteSpace() && cdata.length() > 0) {
processor.processBodyBytes(decode(cdata.toString()));
cdata.setLength(0);
}
else {
cdata.append(new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()).trim());
}
reader.next();
} }
} }
@ -498,10 +541,14 @@ public final class XmlDataImporter extends ActionAbstract {
if (!queueQuery.isExists()) { if (!queueQuery.isExists()) {
session.createQueue(address, queueName, filter, true); session.createQueue(address, queueName, filter, true);
ActiveMQServerLogger.LOGGER.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")"); if (logger.isDebugEnabled()) {
logger.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
}
} }
else { else {
ActiveMQServerLogger.LOGGER.debug("Binding " + queueName + " already exists so won't re-bind."); if (logger.isDebugEnabled()) {
logger.debug("Binding " + queueName + " already exists so won't re-bind.");
}
} }
addressMap.put(queueName, address); addressMap.put(queueName, address);
@ -601,123 +648,183 @@ public final class XmlDataImporter extends ActionAbstract {
case XMLStreamConstants.START_ELEMENT: case XMLStreamConstants.START_ELEMENT:
if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_FAILOVER_TIMEOUT.equals(reader.getLocalName())) { if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_FAILOVER_TIMEOUT.equals(reader.getLocalName())) {
callFailoverTimeout = reader.getElementText(); callFailoverTimeout = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory callFailoverTimeout: " + callFailoverTimeout); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory callFailoverTimeout: " + callFailoverTimeout);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_TIMEOUT.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_TIMEOUT.equals(reader.getLocalName())) {
callTimeout = reader.getElementText(); callTimeout = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory callTimeout: " + callTimeout); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory callTimeout: " + callTimeout);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_FAILURE_CHECK_PERIOD.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_FAILURE_CHECK_PERIOD.equals(reader.getLocalName())) {
clientFailureCheckPeriod = reader.getElementText(); clientFailureCheckPeriod = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory clientFailureCheckPeriod: " + clientFailureCheckPeriod); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory clientFailureCheckPeriod: " + clientFailureCheckPeriod);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_ID.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_ID.equals(reader.getLocalName())) {
clientId = reader.getElementText(); clientId = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory clientId: " + clientId); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory clientId: " + clientId);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONFIRMATION_WINDOW_SIZE.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONFIRMATION_WINDOW_SIZE.equals(reader.getLocalName())) {
confirmationWindowSize = reader.getElementText(); confirmationWindowSize = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory confirmationWindowSize: " + confirmationWindowSize); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory confirmationWindowSize: " + confirmationWindowSize);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTION_TTL.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTION_TTL.equals(reader.getLocalName())) {
connectionTtl = reader.getElementText(); connectionTtl = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory connectionTtl: " + connectionTtl); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory connectionTtl: " + connectionTtl);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTOR.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTOR.equals(reader.getLocalName())) {
connectors = getConnectors(); connectors = getConnectors();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory getLocalName: " + connectors); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory getLocalName: " + connectors);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_MAX_RATE.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_MAX_RATE.equals(reader.getLocalName())) {
consumerMaxRate = reader.getElementText(); consumerMaxRate = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory consumerMaxRate: " + consumerMaxRate); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory consumerMaxRate: " + consumerMaxRate);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_WINDOW_SIZE.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_WINDOW_SIZE.equals(reader.getLocalName())) {
consumerWindowSize = reader.getElementText(); consumerWindowSize = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory consumerWindowSize: " + consumerWindowSize); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory consumerWindowSize: " + consumerWindowSize);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DISCOVERY_GROUP_NAME.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DISCOVERY_GROUP_NAME.equals(reader.getLocalName())) {
discoveryGroupName = reader.getElementText(); discoveryGroupName = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory discoveryGroupName: " + discoveryGroupName); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory discoveryGroupName: " + discoveryGroupName);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DUPS_OK_BATCH_SIZE.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DUPS_OK_BATCH_SIZE.equals(reader.getLocalName())) {
dupsOkBatchSize = reader.getElementText(); dupsOkBatchSize = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory dupsOkBatchSize: " + dupsOkBatchSize); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory dupsOkBatchSize: " + dupsOkBatchSize);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_GROUP_ID.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_GROUP_ID.equals(reader.getLocalName())) {
groupId = reader.getElementText(); groupId = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory groupId: " + groupId); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory groupId: " + groupId);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_LOAD_BALANCING_POLICY_CLASS_NAME.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_LOAD_BALANCING_POLICY_CLASS_NAME.equals(reader.getLocalName())) {
loadBalancingPolicyClassName = reader.getElementText(); loadBalancingPolicyClassName = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory loadBalancingPolicyClassName: " + loadBalancingPolicyClassName); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory loadBalancingPolicyClassName: " + loadBalancingPolicyClassName);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MAX_RETRY_INTERVAL.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MAX_RETRY_INTERVAL.equals(reader.getLocalName())) {
maxRetryInterval = reader.getElementText(); maxRetryInterval = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory maxRetryInterval: " + maxRetryInterval); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory maxRetryInterval: " + maxRetryInterval);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MIN_LARGE_MESSAGE_SIZE.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MIN_LARGE_MESSAGE_SIZE.equals(reader.getLocalName())) {
minLargeMessageSize = reader.getElementText(); minLargeMessageSize = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory minLargeMessageSize: " + minLargeMessageSize); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory minLargeMessageSize: " + minLargeMessageSize);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_NAME.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_NAME.equals(reader.getLocalName())) {
name = reader.getElementText(); name = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory name: " + name); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory name: " + name);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_MAX_RATE.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_MAX_RATE.equals(reader.getLocalName())) {
producerMaxRate = reader.getElementText(); producerMaxRate = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory producerMaxRate: " + producerMaxRate); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory producerMaxRate: " + producerMaxRate);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_WINDOW_SIZE.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_WINDOW_SIZE.equals(reader.getLocalName())) {
producerWindowSize = reader.getElementText(); producerWindowSize = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory producerWindowSize: " + producerWindowSize); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory producerWindowSize: " + producerWindowSize);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RECONNECT_ATTEMPTS.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RECONNECT_ATTEMPTS.equals(reader.getLocalName())) {
reconnectAttempts = reader.getElementText(); reconnectAttempts = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory reconnectAttempts: " + reconnectAttempts); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory reconnectAttempts: " + reconnectAttempts);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL.equals(reader.getLocalName())) {
retryInterval = reader.getElementText(); retryInterval = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory retryInterval: " + retryInterval); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory retryInterval: " + retryInterval);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL_MULTIPLIER.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL_MULTIPLIER.equals(reader.getLocalName())) {
retryIntervalMultiplier = reader.getElementText(); retryIntervalMultiplier = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory retryIntervalMultiplier: " + retryIntervalMultiplier); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory retryIntervalMultiplier: " + retryIntervalMultiplier);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_SCHEDULED_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_SCHEDULED_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName())) {
scheduledThreadMaxPoolSize = reader.getElementText(); scheduledThreadMaxPoolSize = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory scheduledThreadMaxPoolSize: " + scheduledThreadMaxPoolSize); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory scheduledThreadMaxPoolSize: " + scheduledThreadMaxPoolSize);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName())) {
threadMaxPoolSize = reader.getElementText(); threadMaxPoolSize = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory threadMaxPoolSize: " + threadMaxPoolSize); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory threadMaxPoolSize: " + threadMaxPoolSize);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TRANSACTION_BATCH_SIZE.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TRANSACTION_BATCH_SIZE.equals(reader.getLocalName())) {
transactionBatchSize = reader.getElementText(); transactionBatchSize = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory transactionBatchSize: " + transactionBatchSize); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory transactionBatchSize: " + transactionBatchSize);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TYPE.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TYPE.equals(reader.getLocalName())) {
type = reader.getElementText(); type = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory type: " + type); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory type: " + type);
}
} }
else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) {
entries = getEntries(); entries = getEntries();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory entries: " + entries); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory entries: " + entries);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_AUTO_GROUP.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_AUTO_GROUP.equals(reader.getLocalName())) {
autoGroup = reader.getElementText(); autoGroup = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory autoGroup: " + autoGroup); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory autoGroup: " + autoGroup);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_ACKNOWLEDGE.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_ACKNOWLEDGE.equals(reader.getLocalName())) {
blockOnAcknowledge = reader.getElementText(); blockOnAcknowledge = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory blockOnAcknowledge: " + blockOnAcknowledge); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory blockOnAcknowledge: " + blockOnAcknowledge);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_DURABLE_SEND.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_DURABLE_SEND.equals(reader.getLocalName())) {
blockOnDurableSend = reader.getElementText(); blockOnDurableSend = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory blockOnDurableSend: " + blockOnDurableSend); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory blockOnDurableSend: " + blockOnDurableSend);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_NON_DURABLE_SEND.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_NON_DURABLE_SEND.equals(reader.getLocalName())) {
blockOnNonDurableSend = reader.getElementText(); blockOnNonDurableSend = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory blockOnNonDurableSend: " + blockOnNonDurableSend); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory blockOnNonDurableSend: " + blockOnNonDurableSend);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CACHE_LARGE_MESSAGES_CLIENT.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CACHE_LARGE_MESSAGES_CLIENT.equals(reader.getLocalName())) {
cacheLargeMessagesClient = reader.getElementText(); cacheLargeMessagesClient = reader.getElementText();
@ -725,23 +832,33 @@ public final class XmlDataImporter extends ActionAbstract {
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_COMPRESS_LARGE_MESSAGES.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_COMPRESS_LARGE_MESSAGES.equals(reader.getLocalName())) {
compressLargeMessages = reader.getElementText(); compressLargeMessages = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory compressLargeMessages: " + compressLargeMessages); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory compressLargeMessages: " + compressLargeMessages);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_FAILOVER_ON_INITIAL_CONNECTION.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_FAILOVER_ON_INITIAL_CONNECTION.equals(reader.getLocalName())) {
failoverOnInitialConnection = reader.getElementText(); failoverOnInitialConnection = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory failoverOnInitialConnection: " + failoverOnInitialConnection); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory failoverOnInitialConnection: " + failoverOnInitialConnection);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_HA.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_HA.equals(reader.getLocalName())) {
ha = reader.getElementText(); ha = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory ha: " + ha); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory ha: " + ha);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PREACKNOWLEDGE.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PREACKNOWLEDGE.equals(reader.getLocalName())) {
preacknowledge = reader.getElementText(); preacknowledge = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory preacknowledge: " + preacknowledge); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory preacknowledge: " + preacknowledge);
}
} }
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_USE_GLOBAL_POOLS.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_CONNECTION_FACTORY_USE_GLOBAL_POOLS.equals(reader.getLocalName())) {
useGlobalPools = reader.getElementText(); useGlobalPools = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS connection factory useGlobalPools: " + useGlobalPools); if (logger.isDebugEnabled()) {
logger.debug("JMS connection factory useGlobalPools: " + useGlobalPools);
}
} }
break; break;
case XMLStreamConstants.END_ELEMENT: case XMLStreamConstants.END_ELEMENT:
@ -763,7 +880,9 @@ public final class XmlDataImporter extends ActionAbstract {
managementSession.start(); managementSession.start();
ClientMessage reply = requestor.request(managementMessage); ClientMessage reply = requestor.request(managementMessage);
if (ManagementHelper.hasOperationSucceeded(reply)) { if (ManagementHelper.hasOperationSucceeded(reply)) {
ActiveMQServerLogger.LOGGER.debug("Created connection factory " + name); if (logger.isDebugEnabled()) {
logger.debug("Created connection factory " + name);
}
} }
else { else {
ActiveMQServerLogger.LOGGER.error("Problem creating " + name); ActiveMQServerLogger.LOGGER.error("Problem creating " + name);
@ -785,15 +904,21 @@ public final class XmlDataImporter extends ActionAbstract {
case XMLStreamConstants.START_ELEMENT: case XMLStreamConstants.START_ELEMENT:
if (XmlDataConstants.JMS_DESTINATION_NAME.equals(reader.getLocalName())) { if (XmlDataConstants.JMS_DESTINATION_NAME.equals(reader.getLocalName())) {
name = reader.getElementText(); name = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS destination name: " + name); if (logger.isDebugEnabled()) {
logger.debug("JMS destination name: " + name);
}
} }
else if (XmlDataConstants.JMS_DESTINATION_SELECTOR.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_DESTINATION_SELECTOR.equals(reader.getLocalName())) {
selector = reader.getElementText(); selector = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS destination selector: " + selector); if (logger.isDebugEnabled()) {
logger.debug("JMS destination selector: " + selector);
}
} }
else if (XmlDataConstants.JMS_DESTINATION_TYPE.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_DESTINATION_TYPE.equals(reader.getLocalName())) {
type = reader.getElementText(); type = reader.getElementText();
ActiveMQServerLogger.LOGGER.debug("JMS destination type: " + type); if (logger.isDebugEnabled()) {
logger.debug("JMS destination type: " + type);
}
} }
else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) { else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) {
entries = getEntries(); entries = getEntries();
@ -822,7 +947,9 @@ public final class XmlDataImporter extends ActionAbstract {
managementSession.start(); managementSession.start();
ClientMessage reply = requestor.request(managementMessage); ClientMessage reply = requestor.request(managementMessage);
if (ManagementHelper.hasOperationSucceeded(reply)) { if (ManagementHelper.hasOperationSucceeded(reply)) {
ActiveMQServerLogger.LOGGER.debug("Created " + type.toLowerCase() + " " + name); if (logger.isDebugEnabled()) {
logger.debug("Created " + type.toLowerCase() + " " + name);
}
} }
else { else {
ActiveMQServerLogger.LOGGER.error("Problem creating " + name); ActiveMQServerLogger.LOGGER.error("Problem creating " + name);
@ -842,7 +969,9 @@ public final class XmlDataImporter extends ActionAbstract {
if (XmlDataConstants.JMS_JNDI_ENTRY.equals(reader.getLocalName())) { if (XmlDataConstants.JMS_JNDI_ENTRY.equals(reader.getLocalName())) {
String elementText = reader.getElementText(); String elementText = reader.getElementText();
entry.append(elementText).append(", "); entry.append(elementText).append(", ");
ActiveMQServerLogger.LOGGER.debug("JMS admin object JNDI entry: " + entry.toString()); if (logger.isDebugEnabled()) {
logger.debug("JMS admin object JNDI entry: " + entry.toString());
}
} }
break; break;
case XMLStreamConstants.END_ELEMENT: case XMLStreamConstants.END_ELEMENT:
@ -897,6 +1026,11 @@ public final class XmlDataImporter extends ActionAbstract {
return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
} }
private interface MessageBodyBytesProcessor {
void processBodyBytes(byte[] bytes) throws IOException;
}
// Inner classes ------------------------------------------------- // Inner classes -------------------------------------------------
} }

View File

@ -376,6 +376,18 @@
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.codehaus.woodstox</groupId>
<artifactId>woodstox-core-asl</artifactId>
<version>4.4.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>javax.xml.stream</groupId>
<artifactId>stax-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -18,15 +18,18 @@ package org.apache.activemq.artemis.tests.integration.persistence;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -38,6 +41,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType; import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporter; import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporter;
import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter; import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter;
@ -52,6 +56,7 @@ import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.tests.unit.util.InVMContext; import org.apache.activemq.artemis.tests.unit.util.InVMContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -259,6 +264,94 @@ public class XmlImportExportTest extends ActiveMQTestBase {
assertEquals(Message.DEFAULT_TYPE, msg.getType()); assertEquals(Message.DEFAULT_TYPE, msg.getType());
} }
@Test
public void testTextMessage() throws Exception {
StringBuilder data = new StringBuilder();
for (int i = 0; i < 2608; i++) {
data.append("X");
}
ClientSession session = basicSetUp();
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
ClientProducer producer = session.createProducer(QUEUE_NAME);
ClientMessage msg = session.createMessage(Message.TEXT_TYPE, true);
msg.getBodyBuffer().writeString(data.toString());
producer.send(msg);
session.close();
locator.close();
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
server.start();
checkForLongs();
locator = createInVMNonHALocator();
factory = createSessionFactory(locator);
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session);
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
msg = consumer.receive(CONSUMER_TIMEOUT);
assertEquals(Message.TEXT_TYPE, msg.getType());
assertEquals(data.toString(), msg.getBodyBuffer().readString());
}
@Test
public void testBytesMessage() throws Exception {
StringBuilder data = new StringBuilder();
for (int i = 0; i < 2610; i++) {
data.append("X");
}
ClientSession session = basicSetUp();
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
ClientProducer producer = session.createProducer(QUEUE_NAME);
ClientMessage msg = session.createMessage(Message.BYTES_TYPE, true);
msg.getBodyBuffer().writeBytes(data.toString().getBytes());
producer.send(msg);
session.close();
locator.close();
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
server.start();
checkForLongs();
locator = createInVMNonHALocator();
factory = createSessionFactory(locator);
session = factory.createSession(false, true, true);
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(xmlInputStream, session);
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
session.start();
msg = consumer.receive(CONSUMER_TIMEOUT);
assertEquals(Message.BYTES_TYPE, msg.getType());
byte[] result = new byte[msg.getBodySize()];
msg.getBodyBuffer().readBytes(result);
assertEquals(data.toString().getBytes().length, result.length);
}
@Test @Test
public void testMessageAttributes() throws Exception { public void testMessageAttributes() throws Exception {
@ -570,6 +663,54 @@ public class XmlImportExportTest extends ActiveMQTestBase {
session.commit(); session.commit();
} }
@Test
public void testLargeJmsTextMessage() throws Exception {
basicSetUp();
ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory("vm://0", "test");
Connection c = cf.createConnection();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = s.createProducer(ActiveMQJMSClient.createQueue("A"));
p.setDeliveryMode(DeliveryMode.PERSISTENT);
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 1024 * 200; i++) {
stringBuilder.append(RandomUtil.randomChar());
}
TextMessage textMessage = s.createTextMessage(stringBuilder.toString());
textMessage.setStringProperty("_AMQ_DUPL_ID", String.valueOf(UUID.randomUUID()));
p.send(textMessage);
c.close();
locator.close();
server.stop();
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
XmlDataExporter xmlDataExporter = new XmlDataExporter();
xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory());
System.out.print(new String(xmlOutputStream.toByteArray()));
clearDataRecreateServerDirs();
server.start();
checkForLongs();
locator = createInVMNonHALocator();
factory = createSessionFactory(locator);
ClientSession session = factory.createSession(false, true, true);
ByteArrayInputStream inputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
XmlDataImporter xmlDataImporter = new XmlDataImporter();
xmlDataImporter.process(inputStream, session);
session.close();
c = cf.createConnection();
s = c.createSession();
MessageConsumer mc = s.createConsumer(ActiveMQJMSClient.createQueue("A"));
c.start();
javax.jms.Message msg = mc.receive(CONSUMER_TIMEOUT);
assertNotNull(msg);
c.close();
}
@Test @Test
public void testPartialQueue() throws Exception { public void testPartialQueue() throws Exception {
ClientSession session = basicSetUp(); ClientSession session = basicSetUp();