ARTEMIS-747 multiple CDATA events on import fails
This commit is contained in:
parent
42c7080941
commit
ea552a1f88
|
@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.utils.Base64;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
* Read XML output from <code>org.apache.activemq.artemis.core.persistence.impl.journal.XmlDataExporter</code>, create a core session, and
|
||||
|
@ -65,6 +66,8 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
|
|||
public final class XmlDataImporter extends ActionAbstract {
|
||||
// Constants -----------------------------------------------------
|
||||
|
||||
private static final Logger logger = Logger.getLogger(XmlDataImporter.class);
|
||||
|
||||
// Attributes ----------------------------------------------------
|
||||
|
||||
private XMLStreamReader reader;
|
||||
|
@ -192,7 +195,9 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
private void processXml() throws Exception {
|
||||
try {
|
||||
while (reader.hasNext()) {
|
||||
ActiveMQServerLogger.LOGGER.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
|
||||
}
|
||||
if (reader.getEventType() == XMLStreamConstants.START_ELEMENT) {
|
||||
if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName())) {
|
||||
bindQueue();
|
||||
|
@ -334,12 +339,16 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
ClientMessage managementMessage = managementSession.createMessage(false);
|
||||
ManagementHelper.putAttribute(managementMessage, "core.queue." + queue, "ID");
|
||||
managementSession.start();
|
||||
ActiveMQServerLogger.LOGGER.debug("Requesting ID for: " + queue);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Requesting ID for: " + queue);
|
||||
}
|
||||
ClientMessage reply = requestor.request(managementMessage);
|
||||
Number idObject = (Number) ManagementHelper.getResult(reply);
|
||||
queueID = idObject.longValue();
|
||||
requestor.close();
|
||||
ActiveMQServerLogger.LOGGER.debug("ID for " + queue + " is: " + queueID);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("ID for " + queue + " is: " + queueID);
|
||||
}
|
||||
queueIDs.put(queue, queueID); // store it so we don't have to look it up every time
|
||||
}
|
||||
|
||||
|
@ -348,7 +357,9 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
}
|
||||
|
||||
logMessage.delete(logMessage.length() - 2, logMessage.length()); // take off the trailing comma
|
||||
ActiveMQServerLogger.LOGGER.debug(logMessage);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(logMessage);
|
||||
}
|
||||
|
||||
message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
|
||||
try (ClientProducer producer = session.createProducer(destination)) {
|
||||
|
@ -434,7 +445,7 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
}
|
||||
}
|
||||
|
||||
private void processMessageBody(Message message) throws XMLStreamException, IOException {
|
||||
private void processMessageBody(final Message message) throws XMLStreamException, IOException {
|
||||
boolean isLarge = false;
|
||||
|
||||
for (int i = 0; i < reader.getAttributeCount(); i++) {
|
||||
|
@ -444,33 +455,65 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
}
|
||||
}
|
||||
reader.next();
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("XMLStreamReader impl: " + reader);
|
||||
}
|
||||
if (isLarge) {
|
||||
tempFileName = UUID.randomUUID().toString() + ".tmp";
|
||||
ActiveMQServerLogger.LOGGER.debug("Creating temp file " + tempFileName + " for large message.");
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Creating temp file " + tempFileName + " for large message.");
|
||||
}
|
||||
try (OutputStream out = new FileOutputStream(tempFileName)) {
|
||||
while (reader.hasNext()) {
|
||||
if (reader.getEventType() == XMLStreamConstants.END_ELEMENT) {
|
||||
break;
|
||||
}
|
||||
else {
|
||||
String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
|
||||
String trimmedCharacters = characters.trim();
|
||||
if (trimmedCharacters.length() > 0) { // this will skip "indentation" characters
|
||||
byte[] data = decode(trimmedCharacters);
|
||||
out.write(data);
|
||||
}
|
||||
}
|
||||
reader.next();
|
||||
getMessageBodyBytes(new MessageBodyBytesProcessor() {
|
||||
@Override
|
||||
public void processBodyBytes(byte[] bytes) throws IOException {
|
||||
out.write(bytes);
|
||||
}
|
||||
});
|
||||
}
|
||||
FileInputStream fileInputStream = new FileInputStream(tempFileName);
|
||||
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
|
||||
((ClientMessage) message).setBodyInputStream(bufferedInput);
|
||||
}
|
||||
else {
|
||||
reader.next(); // step past the "indentation" characters to get to the CDATA with the message body
|
||||
String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
|
||||
message.getBodyBuffer().writeBytes(decode(characters.trim()));
|
||||
getMessageBodyBytes(new MessageBodyBytesProcessor() {
|
||||
@Override
|
||||
public void processBodyBytes(byte[] bytes) throws IOException {
|
||||
message.getBodyBuffer().writeBytes(bytes);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Message bodies are written to XML as one or more Base64 encoded CDATA elements. Some parser implementations won't
|
||||
* read an entire CDATA element at once (e.g. Woodstox) so it's possible that multiple CDATA/CHARACTERS events need
|
||||
* to be combined to reconstruct the Base64 encoded string. You can't decode bits and pieces of each CDATA. Each
|
||||
* CDATA has to be decoded in its entirety.
|
||||
*
|
||||
* @param processor used to deal with the decoded CDATA elements
|
||||
* @throws IOException
|
||||
* @throws XMLStreamException
|
||||
*/
|
||||
private void getMessageBodyBytes(MessageBodyBytesProcessor processor) throws IOException, XMLStreamException {
|
||||
int currentEventType;
|
||||
StringBuilder cdata = new StringBuilder();
|
||||
while (reader.hasNext()) {
|
||||
currentEventType = reader.getEventType();
|
||||
if (currentEventType == XMLStreamConstants.END_ELEMENT) {
|
||||
break;
|
||||
}
|
||||
/* when we hit a whitespace CHARACTERS event we know that the entire CDATA is complete so decode, pass back to
|
||||
* the processor, and reset the cdata for the next event(s)
|
||||
*/
|
||||
else if (currentEventType == XMLStreamConstants.CHARACTERS && reader.isWhiteSpace() && cdata.length() > 0) {
|
||||
processor.processBodyBytes(decode(cdata.toString()));
|
||||
cdata.setLength(0);
|
||||
}
|
||||
else {
|
||||
cdata.append(new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()).trim());
|
||||
}
|
||||
reader.next();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -498,10 +541,14 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
|
||||
if (!queueQuery.isExists()) {
|
||||
session.createQueue(address, queueName, filter, true);
|
||||
ActiveMQServerLogger.LOGGER.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
|
||||
}
|
||||
}
|
||||
else {
|
||||
ActiveMQServerLogger.LOGGER.debug("Binding " + queueName + " already exists so won't re-bind.");
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Binding " + queueName + " already exists so won't re-bind.");
|
||||
}
|
||||
}
|
||||
|
||||
addressMap.put(queueName, address);
|
||||
|
@ -601,123 +648,183 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
case XMLStreamConstants.START_ELEMENT:
|
||||
if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_FAILOVER_TIMEOUT.equals(reader.getLocalName())) {
|
||||
callFailoverTimeout = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory callFailoverTimeout: " + callFailoverTimeout);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory callFailoverTimeout: " + callFailoverTimeout);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CALL_TIMEOUT.equals(reader.getLocalName())) {
|
||||
callTimeout = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory callTimeout: " + callTimeout);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory callTimeout: " + callTimeout);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_FAILURE_CHECK_PERIOD.equals(reader.getLocalName())) {
|
||||
clientFailureCheckPeriod = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory clientFailureCheckPeriod: " + clientFailureCheckPeriod);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory clientFailureCheckPeriod: " + clientFailureCheckPeriod);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CLIENT_ID.equals(reader.getLocalName())) {
|
||||
clientId = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory clientId: " + clientId);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory clientId: " + clientId);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONFIRMATION_WINDOW_SIZE.equals(reader.getLocalName())) {
|
||||
confirmationWindowSize = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory confirmationWindowSize: " + confirmationWindowSize);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory confirmationWindowSize: " + confirmationWindowSize);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTION_TTL.equals(reader.getLocalName())) {
|
||||
connectionTtl = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory connectionTtl: " + connectionTtl);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory connectionTtl: " + connectionTtl);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONNECTOR.equals(reader.getLocalName())) {
|
||||
connectors = getConnectors();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory getLocalName: " + connectors);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory getLocalName: " + connectors);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_MAX_RATE.equals(reader.getLocalName())) {
|
||||
consumerMaxRate = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory consumerMaxRate: " + consumerMaxRate);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory consumerMaxRate: " + consumerMaxRate);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CONSUMER_WINDOW_SIZE.equals(reader.getLocalName())) {
|
||||
consumerWindowSize = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory consumerWindowSize: " + consumerWindowSize);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory consumerWindowSize: " + consumerWindowSize);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DISCOVERY_GROUP_NAME.equals(reader.getLocalName())) {
|
||||
discoveryGroupName = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory discoveryGroupName: " + discoveryGroupName);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory discoveryGroupName: " + discoveryGroupName);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_DUPS_OK_BATCH_SIZE.equals(reader.getLocalName())) {
|
||||
dupsOkBatchSize = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory dupsOkBatchSize: " + dupsOkBatchSize);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory dupsOkBatchSize: " + dupsOkBatchSize);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_GROUP_ID.equals(reader.getLocalName())) {
|
||||
groupId = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory groupId: " + groupId);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory groupId: " + groupId);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_LOAD_BALANCING_POLICY_CLASS_NAME.equals(reader.getLocalName())) {
|
||||
loadBalancingPolicyClassName = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory loadBalancingPolicyClassName: " + loadBalancingPolicyClassName);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory loadBalancingPolicyClassName: " + loadBalancingPolicyClassName);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MAX_RETRY_INTERVAL.equals(reader.getLocalName())) {
|
||||
maxRetryInterval = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory maxRetryInterval: " + maxRetryInterval);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory maxRetryInterval: " + maxRetryInterval);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_MIN_LARGE_MESSAGE_SIZE.equals(reader.getLocalName())) {
|
||||
minLargeMessageSize = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory minLargeMessageSize: " + minLargeMessageSize);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory minLargeMessageSize: " + minLargeMessageSize);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_NAME.equals(reader.getLocalName())) {
|
||||
name = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory name: " + name);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory name: " + name);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_MAX_RATE.equals(reader.getLocalName())) {
|
||||
producerMaxRate = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory producerMaxRate: " + producerMaxRate);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory producerMaxRate: " + producerMaxRate);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PRODUCER_WINDOW_SIZE.equals(reader.getLocalName())) {
|
||||
producerWindowSize = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory producerWindowSize: " + producerWindowSize);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory producerWindowSize: " + producerWindowSize);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RECONNECT_ATTEMPTS.equals(reader.getLocalName())) {
|
||||
reconnectAttempts = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory reconnectAttempts: " + reconnectAttempts);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory reconnectAttempts: " + reconnectAttempts);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL.equals(reader.getLocalName())) {
|
||||
retryInterval = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory retryInterval: " + retryInterval);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory retryInterval: " + retryInterval);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_RETRY_INTERVAL_MULTIPLIER.equals(reader.getLocalName())) {
|
||||
retryIntervalMultiplier = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory retryIntervalMultiplier: " + retryIntervalMultiplier);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory retryIntervalMultiplier: " + retryIntervalMultiplier);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_SCHEDULED_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName())) {
|
||||
scheduledThreadMaxPoolSize = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory scheduledThreadMaxPoolSize: " + scheduledThreadMaxPoolSize);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory scheduledThreadMaxPoolSize: " + scheduledThreadMaxPoolSize);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_THREAD_POOL_MAX_SIZE.equals(reader.getLocalName())) {
|
||||
threadMaxPoolSize = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory threadMaxPoolSize: " + threadMaxPoolSize);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory threadMaxPoolSize: " + threadMaxPoolSize);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TRANSACTION_BATCH_SIZE.equals(reader.getLocalName())) {
|
||||
transactionBatchSize = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory transactionBatchSize: " + transactionBatchSize);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory transactionBatchSize: " + transactionBatchSize);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_TYPE.equals(reader.getLocalName())) {
|
||||
type = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory type: " + type);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory type: " + type);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) {
|
||||
entries = getEntries();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory entries: " + entries);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory entries: " + entries);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_AUTO_GROUP.equals(reader.getLocalName())) {
|
||||
autoGroup = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory autoGroup: " + autoGroup);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory autoGroup: " + autoGroup);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_ACKNOWLEDGE.equals(reader.getLocalName())) {
|
||||
blockOnAcknowledge = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory blockOnAcknowledge: " + blockOnAcknowledge);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory blockOnAcknowledge: " + blockOnAcknowledge);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_DURABLE_SEND.equals(reader.getLocalName())) {
|
||||
blockOnDurableSend = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory blockOnDurableSend: " + blockOnDurableSend);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory blockOnDurableSend: " + blockOnDurableSend);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_BLOCK_ON_NON_DURABLE_SEND.equals(reader.getLocalName())) {
|
||||
blockOnNonDurableSend = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory blockOnNonDurableSend: " + blockOnNonDurableSend);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory blockOnNonDurableSend: " + blockOnNonDurableSend);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_CACHE_LARGE_MESSAGES_CLIENT.equals(reader.getLocalName())) {
|
||||
cacheLargeMessagesClient = reader.getElementText();
|
||||
|
@ -725,23 +832,33 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_COMPRESS_LARGE_MESSAGES.equals(reader.getLocalName())) {
|
||||
compressLargeMessages = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory compressLargeMessages: " + compressLargeMessages);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory compressLargeMessages: " + compressLargeMessages);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_FAILOVER_ON_INITIAL_CONNECTION.equals(reader.getLocalName())) {
|
||||
failoverOnInitialConnection = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory failoverOnInitialConnection: " + failoverOnInitialConnection);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory failoverOnInitialConnection: " + failoverOnInitialConnection);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_HA.equals(reader.getLocalName())) {
|
||||
ha = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory ha: " + ha);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory ha: " + ha);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_PREACKNOWLEDGE.equals(reader.getLocalName())) {
|
||||
preacknowledge = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory preacknowledge: " + preacknowledge);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory preacknowledge: " + preacknowledge);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_CONNECTION_FACTORY_USE_GLOBAL_POOLS.equals(reader.getLocalName())) {
|
||||
useGlobalPools = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS connection factory useGlobalPools: " + useGlobalPools);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS connection factory useGlobalPools: " + useGlobalPools);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case XMLStreamConstants.END_ELEMENT:
|
||||
|
@ -763,7 +880,9 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
managementSession.start();
|
||||
ClientMessage reply = requestor.request(managementMessage);
|
||||
if (ManagementHelper.hasOperationSucceeded(reply)) {
|
||||
ActiveMQServerLogger.LOGGER.debug("Created connection factory " + name);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Created connection factory " + name);
|
||||
}
|
||||
}
|
||||
else {
|
||||
ActiveMQServerLogger.LOGGER.error("Problem creating " + name);
|
||||
|
@ -785,15 +904,21 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
case XMLStreamConstants.START_ELEMENT:
|
||||
if (XmlDataConstants.JMS_DESTINATION_NAME.equals(reader.getLocalName())) {
|
||||
name = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS destination name: " + name);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS destination name: " + name);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_DESTINATION_SELECTOR.equals(reader.getLocalName())) {
|
||||
selector = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS destination selector: " + selector);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS destination selector: " + selector);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_DESTINATION_TYPE.equals(reader.getLocalName())) {
|
||||
type = reader.getElementText();
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS destination type: " + type);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS destination type: " + type);
|
||||
}
|
||||
}
|
||||
else if (XmlDataConstants.JMS_JNDI_ENTRIES.equals(reader.getLocalName())) {
|
||||
entries = getEntries();
|
||||
|
@ -822,7 +947,9 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
managementSession.start();
|
||||
ClientMessage reply = requestor.request(managementMessage);
|
||||
if (ManagementHelper.hasOperationSucceeded(reply)) {
|
||||
ActiveMQServerLogger.LOGGER.debug("Created " + type.toLowerCase() + " " + name);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Created " + type.toLowerCase() + " " + name);
|
||||
}
|
||||
}
|
||||
else {
|
||||
ActiveMQServerLogger.LOGGER.error("Problem creating " + name);
|
||||
|
@ -842,7 +969,9 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
if (XmlDataConstants.JMS_JNDI_ENTRY.equals(reader.getLocalName())) {
|
||||
String elementText = reader.getElementText();
|
||||
entry.append(elementText).append(", ");
|
||||
ActiveMQServerLogger.LOGGER.debug("JMS admin object JNDI entry: " + entry.toString());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JMS admin object JNDI entry: " + entry.toString());
|
||||
}
|
||||
}
|
||||
break;
|
||||
case XMLStreamConstants.END_ELEMENT:
|
||||
|
@ -897,6 +1026,11 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
|
||||
}
|
||||
|
||||
private interface MessageBodyBytesProcessor {
|
||||
|
||||
void processBodyBytes(byte[] bytes) throws IOException;
|
||||
}
|
||||
|
||||
// Inner classes -------------------------------------------------
|
||||
|
||||
}
|
||||
|
|
|
@ -376,6 +376,18 @@
|
|||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.codehaus.woodstox</groupId>
|
||||
<artifactId>woodstox-core-asl</artifactId>
|
||||
<version>4.4.0</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>javax.xml.stream</groupId>
|
||||
<artifactId>stax-api</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -18,15 +18,18 @@ package org.apache.activemq.artemis.tests.integration.persistence;
|
|||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -38,6 +41,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
|
||||
import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporter;
|
||||
import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter;
|
||||
|
@ -52,6 +56,7 @@ import org.apache.activemq.artemis.jms.server.JMSServerManager;
|
|||
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
|
||||
import org.apache.activemq.artemis.tests.unit.util.InVMContext;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.RandomUtil;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
@ -259,6 +264,94 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
|||
assertEquals(Message.DEFAULT_TYPE, msg.getType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTextMessage() throws Exception {
|
||||
StringBuilder data = new StringBuilder();
|
||||
for (int i = 0; i < 2608; i++) {
|
||||
data.append("X");
|
||||
}
|
||||
|
||||
ClientSession session = basicSetUp();
|
||||
|
||||
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
|
||||
|
||||
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
||||
ClientMessage msg = session.createMessage(Message.TEXT_TYPE, true);
|
||||
msg.getBodyBuffer().writeString(data.toString());
|
||||
producer.send(msg);
|
||||
|
||||
session.close();
|
||||
locator.close();
|
||||
server.stop();
|
||||
|
||||
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
|
||||
XmlDataExporter xmlDataExporter = new XmlDataExporter();
|
||||
xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory());
|
||||
System.out.print(new String(xmlOutputStream.toByteArray()));
|
||||
|
||||
clearDataRecreateServerDirs();
|
||||
server.start();
|
||||
checkForLongs();
|
||||
locator = createInVMNonHALocator();
|
||||
factory = createSessionFactory(locator);
|
||||
session = factory.createSession(false, true, true);
|
||||
|
||||
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||
xmlDataImporter.process(xmlInputStream, session);
|
||||
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
||||
session.start();
|
||||
|
||||
msg = consumer.receive(CONSUMER_TIMEOUT);
|
||||
assertEquals(Message.TEXT_TYPE, msg.getType());
|
||||
assertEquals(data.toString(), msg.getBodyBuffer().readString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBytesMessage() throws Exception {
|
||||
StringBuilder data = new StringBuilder();
|
||||
for (int i = 0; i < 2610; i++) {
|
||||
data.append("X");
|
||||
}
|
||||
|
||||
ClientSession session = basicSetUp();
|
||||
|
||||
session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
|
||||
|
||||
ClientProducer producer = session.createProducer(QUEUE_NAME);
|
||||
ClientMessage msg = session.createMessage(Message.BYTES_TYPE, true);
|
||||
msg.getBodyBuffer().writeBytes(data.toString().getBytes());
|
||||
producer.send(msg);
|
||||
|
||||
session.close();
|
||||
locator.close();
|
||||
server.stop();
|
||||
|
||||
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
|
||||
XmlDataExporter xmlDataExporter = new XmlDataExporter();
|
||||
xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory());
|
||||
System.out.print(new String(xmlOutputStream.toByteArray()));
|
||||
|
||||
clearDataRecreateServerDirs();
|
||||
server.start();
|
||||
checkForLongs();
|
||||
locator = createInVMNonHALocator();
|
||||
factory = createSessionFactory(locator);
|
||||
session = factory.createSession(false, true, true);
|
||||
|
||||
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||
xmlDataImporter.process(xmlInputStream, session);
|
||||
ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
|
||||
session.start();
|
||||
|
||||
msg = consumer.receive(CONSUMER_TIMEOUT);
|
||||
assertEquals(Message.BYTES_TYPE, msg.getType());
|
||||
byte[] result = new byte[msg.getBodySize()];
|
||||
msg.getBodyBuffer().readBytes(result);
|
||||
assertEquals(data.toString().getBytes().length, result.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMessageAttributes() throws Exception {
|
||||
|
||||
|
@ -570,6 +663,54 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
|||
session.commit();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLargeJmsTextMessage() throws Exception {
|
||||
basicSetUp();
|
||||
ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory("vm://0", "test");
|
||||
Connection c = cf.createConnection();
|
||||
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer p = s.createProducer(ActiveMQJMSClient.createQueue("A"));
|
||||
p.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
for (int i = 0; i < 1024 * 200; i++) {
|
||||
stringBuilder.append(RandomUtil.randomChar());
|
||||
}
|
||||
TextMessage textMessage = s.createTextMessage(stringBuilder.toString());
|
||||
textMessage.setStringProperty("_AMQ_DUPL_ID", String.valueOf(UUID.randomUUID()));
|
||||
p.send(textMessage);
|
||||
c.close();
|
||||
|
||||
locator.close();
|
||||
server.stop();
|
||||
|
||||
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
|
||||
XmlDataExporter xmlDataExporter = new XmlDataExporter();
|
||||
xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory());
|
||||
System.out.print(new String(xmlOutputStream.toByteArray()));
|
||||
|
||||
clearDataRecreateServerDirs();
|
||||
server.start();
|
||||
checkForLongs();
|
||||
locator = createInVMNonHALocator();
|
||||
factory = createSessionFactory(locator);
|
||||
ClientSession session = factory.createSession(false, true, true);
|
||||
|
||||
ByteArrayInputStream inputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
|
||||
XmlDataImporter xmlDataImporter = new XmlDataImporter();
|
||||
xmlDataImporter.process(inputStream, session);
|
||||
session.close();
|
||||
|
||||
c = cf.createConnection();
|
||||
s = c.createSession();
|
||||
MessageConsumer mc = s.createConsumer(ActiveMQJMSClient.createQueue("A"));
|
||||
c.start();
|
||||
javax.jms.Message msg = mc.receive(CONSUMER_TIMEOUT);
|
||||
|
||||
assertNotNull(msg);
|
||||
|
||||
c.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartialQueue() throws Exception {
|
||||
ClientSession session = basicSetUp();
|
||||
|
|
Loading…
Reference in New Issue