Revert "[ARTEMIS-3113]: Artemis AMQP shouldn't depend on JMS."
This reverts commit 5079ad1019
.
This commit is contained in:
parent
6b1d534128
commit
7f64822591
|
@ -1,56 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.utils;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
||||
public class DestinationUtil {
|
||||
|
||||
private static final char SEPARATOR = '.';
|
||||
|
||||
private static String escape(final String input) {
|
||||
if (input == null) {
|
||||
return "";
|
||||
}
|
||||
return input.replace("\\", "\\\\").replace(".", "\\.");
|
||||
}
|
||||
|
||||
public static SimpleString createQueueNameForSubscription(final boolean isDurable,
|
||||
final String clientID,
|
||||
final String subscriptionName) {
|
||||
final String queueName;
|
||||
if (clientID != null) {
|
||||
if (isDurable) {
|
||||
queueName = escape(clientID) + SEPARATOR +
|
||||
escape(subscriptionName);
|
||||
} else {
|
||||
queueName = "nonDurable" + SEPARATOR +
|
||||
escape(clientID) + SEPARATOR +
|
||||
escape(subscriptionName);
|
||||
}
|
||||
} else {
|
||||
if (isDurable) {
|
||||
queueName = escape(subscriptionName);
|
||||
} else {
|
||||
queueName = "nonDurable" + SEPARATOR +
|
||||
escape(subscriptionName);
|
||||
}
|
||||
}
|
||||
return SimpleString.toSimpleString(queueName);
|
||||
}
|
||||
|
||||
}
|
|
@ -31,7 +31,6 @@ import org.apache.activemq.artemis.api.core.RoutingType;
|
|||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
import org.apache.activemq.artemis.jndi.JNDIStorable;
|
||||
import org.apache.activemq.artemis.utils.DestinationUtil;
|
||||
|
||||
/**
|
||||
* ActiveMQ Artemis implementation of a JMS Destination.
|
||||
|
@ -43,8 +42,6 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
|
|||
|
||||
private static final long serialVersionUID = 5027962425462382883L;
|
||||
|
||||
// INFO: These variables are duplicated as part of AMQPMessageSupport in artemis-amqp-protocols
|
||||
// The duplication there is to avoid a dependency on this module from a server's module
|
||||
public static final String QUEUE_QUALIFIED_PREFIX = "queue://";
|
||||
public static final String TOPIC_QUALIFIED_PREFIX = "topic://";
|
||||
public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://";
|
||||
|
@ -170,7 +167,25 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
|
|||
public static SimpleString createQueueNameForSubscription(final boolean isDurable,
|
||||
final String clientID,
|
||||
final String subscriptionName) {
|
||||
return DestinationUtil.createQueueNameForSubscription(isDurable, clientID, subscriptionName);
|
||||
final String queueName;
|
||||
if (clientID != null) {
|
||||
if (isDurable) {
|
||||
queueName = ActiveMQDestination.escape(clientID) + SEPARATOR +
|
||||
ActiveMQDestination.escape(subscriptionName);
|
||||
} else {
|
||||
queueName = "nonDurable" + SEPARATOR +
|
||||
ActiveMQDestination.escape(clientID) + SEPARATOR +
|
||||
ActiveMQDestination.escape(subscriptionName);
|
||||
}
|
||||
} else {
|
||||
if (isDurable) {
|
||||
queueName = ActiveMQDestination.escape(subscriptionName);
|
||||
} else {
|
||||
queueName = "nonDurable" + SEPARATOR +
|
||||
ActiveMQDestination.escape(subscriptionName);
|
||||
}
|
||||
}
|
||||
return SimpleString.toSimpleString(queueName);
|
||||
}
|
||||
|
||||
public static String createQueueNameForSharedSubscription(final boolean isDurable,
|
||||
|
|
|
@ -33,11 +33,11 @@
|
|||
|
||||
<dependencies>
|
||||
<!-- JMS Client because of some conversions that are done -->
|
||||
<!-- <dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-jms-client</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency> -->
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-selector</artifactId>
|
||||
|
@ -110,6 +110,11 @@
|
|||
<artifactId>commons-collections</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-jms_2.0_spec</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnectio
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||
import org.apache.activemq.artemis.core.server.management.NotificationListener;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
|
||||
|
@ -46,7 +47,6 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
|||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
|
@ -102,7 +102,7 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
|
|||
* the address. This can be changed on the acceptor.
|
||||
* */
|
||||
// TODO fix this
|
||||
private String pubSubPrefix = AMQPMessageSupport.TOPIC_QUALIFIED_PREFIX;
|
||||
private String pubSubPrefix = ActiveMQDestination.TOPIC_QUALIFIED_PREFIX;
|
||||
|
||||
private int maxFrameSize = AmqpSupport.MAX_FRAME_SIZE_DEFAULT;
|
||||
|
||||
|
|
|
@ -28,9 +28,21 @@ import java.util.Arrays;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryQueue;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
|
||||
|
@ -42,6 +54,7 @@ import org.apache.qpid.proton.amqp.Binary;
|
|||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.messaging.Data;
|
||||
import org.apache.qpid.proton.message.Message;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
* Support class containing constant values and static methods that are used to map to / from
|
||||
|
@ -49,11 +62,7 @@ import org.apache.qpid.proton.message.Message;
|
|||
*/
|
||||
public final class AMQPMessageSupport {
|
||||
|
||||
// These are duplicates from ActiveMQDestination on the jms module.
|
||||
public static final String QUEUE_QUALIFIED_PREFIX = "queue://";
|
||||
public static final String TOPIC_QUALIFIED_PREFIX = "topic://";
|
||||
public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://";
|
||||
public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://";
|
||||
private static final Logger logger = Logger.getLogger(AMQPMessageSupport.class);
|
||||
|
||||
|
||||
public static SimpleString HDR_ORIGINAL_ADDRESS_ANNOTATION = SimpleString.toSimpleString("x-opt-ORIG-ADDRESS");
|
||||
|
@ -294,11 +303,34 @@ public final class AMQPMessageSupport {
|
|||
return key;
|
||||
}
|
||||
|
||||
|
||||
public static String toAddress(Destination destination) {
|
||||
try {
|
||||
if (destination instanceof ActiveMQDestination) {
|
||||
return ((ActiveMQDestination) destination).getAddress();
|
||||
} else {
|
||||
if (destination instanceof Queue) {
|
||||
return ((Queue) destination).getQueueName();
|
||||
} else if (destination instanceof Topic) {
|
||||
|
||||
return ((Topic) destination).getTopicName();
|
||||
}
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
// ActiveMQDestination (and most JMS implementations I know) will never throw an Exception here
|
||||
// this is here for compilation support (as JMS declares it), and I don't want to propagate exceptions into
|
||||
// the converter...
|
||||
// and for the possibility of who knows in the future!!!
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static ServerJMSBytesMessage createBytesMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
|
||||
return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE, coreMessageObjectPools));
|
||||
}
|
||||
|
||||
public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
|
||||
public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
|
||||
ServerJMSBytesMessage message = createBytesMessage(id, coreMessageObjectPools);
|
||||
message.writeBytes(array, arrayOffset, length);
|
||||
return message;
|
||||
|
@ -316,7 +348,7 @@ public final class AMQPMessageSupport {
|
|||
return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE, coreMessageObjectPools));
|
||||
}
|
||||
|
||||
public static ServerJMSTextMessage createTextMessage(long id, String text, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
|
||||
public static ServerJMSTextMessage createTextMessage(long id, String text, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
|
||||
ServerJMSTextMessage message = createTextMessage(id, coreMessageObjectPools);
|
||||
message.setText(text);
|
||||
return message;
|
||||
|
@ -326,13 +358,13 @@ public final class AMQPMessageSupport {
|
|||
return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE, coreMessageObjectPools));
|
||||
}
|
||||
|
||||
public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
|
||||
public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
|
||||
ServerJMSObjectMessage message = createObjectMessage(id, coreMessageObjectPools);
|
||||
message.setSerializedForm(serializedForm);
|
||||
return message;
|
||||
}
|
||||
|
||||
public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
|
||||
public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
|
||||
ServerJMSObjectMessage message = createObjectMessage(id, coreMessageObjectPools);
|
||||
message.setSerializedForm(new Binary(array, offset, length));
|
||||
return message;
|
||||
|
@ -342,7 +374,7 @@ public final class AMQPMessageSupport {
|
|||
return new ServerJMSMapMessage(newMessage(id, MAP_TYPE, coreMessageObjectPools));
|
||||
}
|
||||
|
||||
public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
|
||||
public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
|
||||
ServerJMSMapMessage message = createMapMessage(id, coreMessageObjectPools);
|
||||
final Set<Map.Entry<String, Object>> set = content.entrySet();
|
||||
for (Map.Entry<String, Object> entry : set) {
|
||||
|
@ -364,8 +396,7 @@ public final class AMQPMessageSupport {
|
|||
}
|
||||
|
||||
|
||||
// IMPORTANT-TODO: HOW TO GET THIS?
|
||||
/*public static byte destinationType(Destination destination) {
|
||||
public static byte destinationType(Destination destination) {
|
||||
if (destination instanceof Queue) {
|
||||
if (destination instanceof TemporaryQueue) {
|
||||
return TEMP_QUEUE_TYPE;
|
||||
|
@ -380,27 +411,22 @@ public final class AMQPMessageSupport {
|
|||
}
|
||||
}
|
||||
|
||||
return QUEUE_TYPE;
|
||||
} */
|
||||
|
||||
public static byte destinationType(String destination) {
|
||||
if (destination.startsWith(QUEUE_QUALIFIED_PREFIX)) {
|
||||
return QUEUE_TYPE;
|
||||
}
|
||||
if (destination.startsWith(TOPIC_QUALIFIED_PREFIX)) {
|
||||
return TOPIC_TYPE;
|
||||
}
|
||||
if (destination.startsWith(TEMP_QUEUE_QUALIFED_PREFIX)) {
|
||||
return TEMP_QUEUE_TYPE;
|
||||
}
|
||||
if (destination.startsWith(TEMP_TOPIC_QUALIFED_PREFIX)) {
|
||||
return TEMP_TOPIC_TYPE;
|
||||
}
|
||||
return QUEUE_TYPE;
|
||||
}
|
||||
|
||||
public static String destination(byte destinationType, String address) {
|
||||
return address;
|
||||
public static Destination destination(byte destinationType, String address) {
|
||||
switch (destinationType) {
|
||||
case TEMP_QUEUE_TYPE:
|
||||
return new ActiveMQTemporaryQueue(address, null);
|
||||
case TEMP_TOPIC_TYPE:
|
||||
return new ActiveMQTemporaryTopic(address, null);
|
||||
case TOPIC_TYPE:
|
||||
return new ActiveMQTopic(address);
|
||||
case QUEUE_TYPE:
|
||||
return new ActiveMQQueue(address);
|
||||
default:
|
||||
return new ActiveMQQueue(address);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -39,11 +39,7 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
|
|||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.QUEUE_QUALIFIED_PREFIX;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_QUEUE_QUALIFED_PREFIX;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_TOPIC_QUALIFED_PREFIX;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TOPIC_QUALIFIED_PREFIX;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createBytesMessage;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMapMessage;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMessage;
|
||||
|
@ -64,9 +60,13 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
|
||||
|
@ -98,14 +98,12 @@ import org.apache.qpid.proton.codec.WritableBuffer;
|
|||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.JMSConstants.DeliveryMode_NON_PERSISTENT;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.JMSConstants.DeliveryMode_PERSISTENT;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.JMSConstants.MESSAGE_DEFAULT_PRIORITY;
|
||||
/**
|
||||
* This class was created just to separate concerns on AMQPConverter.
|
||||
* For better organization of the code.
|
||||
* */
|
||||
public class AmqpCoreConverter {
|
||||
|
||||
public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
|
||||
return message.toCore(coreMessageObjectPools);
|
||||
}
|
||||
|
@ -217,7 +215,7 @@ public class AmqpCoreConverter {
|
|||
// If the JMS expiration has not yet been set...
|
||||
if (header != null && result.getJMSExpiration() == 0) {
|
||||
// Then lets try to set it based on the message TTL.
|
||||
long ttl = JMSConstants.MESSAGE_DEFAULT_TIME_TO_LIVE;
|
||||
long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
|
||||
if (header.getTtl() != null) {
|
||||
ttl = header.getTtl().longValue();
|
||||
}
|
||||
|
@ -244,16 +242,16 @@ public class AmqpCoreConverter {
|
|||
|
||||
if (header.getDurable() != null) {
|
||||
jms.setBooleanProperty(JMS_AMQP_HEADER_DURABLE, true);
|
||||
jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode_PERSISTENT : DeliveryMode_NON_PERSISTENT);
|
||||
jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
|
||||
} else {
|
||||
jms.setJMSDeliveryMode(DeliveryMode_NON_PERSISTENT);
|
||||
jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
}
|
||||
|
||||
if (header.getPriority() != null) {
|
||||
jms.setBooleanProperty(JMS_AMQP_HEADER_PRIORITY, true);
|
||||
jms.setJMSPriority(header.getPriority().intValue());
|
||||
} else {
|
||||
jms.setJMSPriority(MESSAGE_DEFAULT_PRIORITY);
|
||||
jms.setJMSPriority(javax.jms.Message.DEFAULT_PRIORITY);
|
||||
}
|
||||
|
||||
if (header.getFirstAcquirer() != null) {
|
||||
|
@ -266,8 +264,8 @@ public class AmqpCoreConverter {
|
|||
jms.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue() + 1);
|
||||
}
|
||||
} else {
|
||||
jms.setJMSPriority((byte) MESSAGE_DEFAULT_PRIORITY);
|
||||
jms.setJMSDeliveryMode(DeliveryMode_NON_PERSISTENT);
|
||||
jms.setJMSPriority((byte) javax.jms.Message.DEFAULT_PRIORITY);
|
||||
jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
}
|
||||
|
||||
return jms;
|
||||
|
@ -339,7 +337,7 @@ public class AmqpCoreConverter {
|
|||
}
|
||||
if (properties.getTo() != null) {
|
||||
byte queueType = parseQueueAnnotation(annotations, AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
|
||||
jms.setJMSDestination(properties.getTo());
|
||||
jms.setJMSDestination(AMQPMessageSupport.destination(queueType, properties.getTo()));
|
||||
}
|
||||
if (properties.getSubject() != null) {
|
||||
jms.setJMSType(properties.getSubject());
|
||||
|
@ -349,19 +347,19 @@ public class AmqpCoreConverter {
|
|||
|
||||
switch (value) {
|
||||
case AMQPMessageSupport.QUEUE_TYPE:
|
||||
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), QUEUE_QUALIFIED_PREFIX + properties.getReplyTo());
|
||||
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + properties.getReplyTo());
|
||||
break;
|
||||
case AMQPMessageSupport.TEMP_QUEUE_TYPE:
|
||||
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), TEMP_QUEUE_QUALIFED_PREFIX + properties.getReplyTo());
|
||||
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX + properties.getReplyTo());
|
||||
break;
|
||||
case AMQPMessageSupport.TOPIC_TYPE:
|
||||
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), TOPIC_QUALIFIED_PREFIX + properties.getReplyTo());
|
||||
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + properties.getReplyTo());
|
||||
break;
|
||||
case AMQPMessageSupport.TEMP_TOPIC_TYPE:
|
||||
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), TEMP_TOPIC_QUALIFED_PREFIX + properties.getReplyTo());
|
||||
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX + properties.getReplyTo());
|
||||
break;
|
||||
default:
|
||||
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), QUEUE_QUALIFIED_PREFIX + properties.getReplyTo());
|
||||
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + properties.getReplyTo());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -427,7 +425,7 @@ public class AmqpCoreConverter {
|
|||
return jms;
|
||||
}
|
||||
|
||||
private static void encodeUnsupportedMessagePropertyType(ServerJMSMessage jms, String key, Object value) throws Exception {
|
||||
private static void encodeUnsupportedMessagePropertyType(ServerJMSMessage jms, String key, Object value) throws JMSException {
|
||||
final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer();
|
||||
final EncoderImpl encoder = TLSEncode.getEncoder();
|
||||
|
||||
|
@ -446,7 +444,7 @@ public class AmqpCoreConverter {
|
|||
}
|
||||
}
|
||||
|
||||
private static void setProperty(ServerJMSMessage msg, String key, Object value) throws Exception {
|
||||
private static void setProperty(javax.jms.Message msg, String key, Object value) throws JMSException {
|
||||
if (value instanceof UnsignedLong) {
|
||||
long v = ((UnsignedLong) value).longValue();
|
||||
msg.setLongProperty(key, v);
|
||||
|
|
|
@ -45,9 +45,10 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
|
|||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_PREFIX;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_PROPERTIES;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.toAddress;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
|
@ -58,6 +59,11 @@ import java.util.LinkedHashMap;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageEOFException;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
|
@ -143,7 +149,7 @@ public class CoreAmqpConverter {
|
|||
header.setDurable(true);
|
||||
}
|
||||
byte priority = (byte) message.getJMSPriority();
|
||||
if (priority != JMSConstants.MESSAGE_DEFAULT_PRIORITY) {
|
||||
if (priority != javax.jms.Message.DEFAULT_PRIORITY) {
|
||||
if (header == null) {
|
||||
header = new Header();
|
||||
}
|
||||
|
@ -165,15 +171,15 @@ public class CoreAmqpConverter {
|
|||
properties.setMessageId("ID:" + message.getInnerMessage().getUserID().toString());
|
||||
}
|
||||
}
|
||||
SimpleString destination = message.getJMSDestination();
|
||||
Destination destination = message.getJMSDestination();
|
||||
if (destination != null) {
|
||||
properties.setTo(destination.toString());
|
||||
maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(destination.toString()));
|
||||
properties.setTo(toAddress(destination));
|
||||
maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(destination));
|
||||
}
|
||||
SimpleString replyTo = message.getJMSReplyTo();
|
||||
Destination replyTo = message.getJMSReplyTo();
|
||||
if (replyTo != null) {
|
||||
properties.setReplyTo(replyTo.toString());
|
||||
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(replyTo.toString()));
|
||||
properties.setReplyTo(toAddress(replyTo));
|
||||
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(replyTo));
|
||||
}
|
||||
|
||||
Object correlationID = message.getInnerMessage().getCorrelationID();
|
||||
|
@ -387,7 +393,7 @@ public class CoreAmqpConverter {
|
|||
return decodedType;
|
||||
}
|
||||
|
||||
private static Section convertBody(ServerJMSMessage message, Map<Symbol, Object> maMap, Properties properties) throws Exception {
|
||||
private static Section convertBody(ServerJMSMessage message, Map<Symbol, Object> maMap, Properties properties) throws JMSException {
|
||||
|
||||
Section body = null;
|
||||
short orignalEncoding = AMQP_UNKNOWN;
|
||||
|
@ -419,7 +425,7 @@ public class CoreAmqpConverter {
|
|||
break;
|
||||
}
|
||||
} else if (message instanceof ServerJMSTextMessage) {
|
||||
String text = (((ServerJMSTextMessage) message).getText());
|
||||
String text = (((TextMessage) message).getText());
|
||||
|
||||
switch (orignalEncoding) {
|
||||
case AMQP_NULL:
|
||||
|
@ -450,7 +456,7 @@ public class CoreAmqpConverter {
|
|||
while (true) {
|
||||
list.add(m.readObject());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
} catch (MessageEOFException e) {
|
||||
}
|
||||
|
||||
switch (orignalEncoding) {
|
||||
|
@ -515,7 +521,7 @@ public class CoreAmqpConverter {
|
|||
return body;
|
||||
}
|
||||
|
||||
private static Binary getBinaryFromMessageBody(ServerJMSBytesMessage message) throws Exception {
|
||||
private static Binary getBinaryFromMessageBody(ServerJMSBytesMessage message) throws JMSException {
|
||||
byte[] data = new byte[(int) message.getBodyLength()];
|
||||
message.readBytes(data);
|
||||
message.reset(); // Need to reset after readBytes or future readBytes
|
||||
|
@ -523,11 +529,11 @@ public class CoreAmqpConverter {
|
|||
return new Binary(data);
|
||||
}
|
||||
|
||||
private static Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws Exception {
|
||||
private static Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException {
|
||||
return message.getSerializedForm();
|
||||
}
|
||||
|
||||
private static Map<String, Object> getMapFromMessageBody(ServerJMSMapMessage message) throws Exception {
|
||||
private static Map<String, Object> getMapFromMessageBody(ServerJMSMapMessage message) throws JMSException {
|
||||
final HashMap<String, Object> map = new LinkedHashMap<>();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
|
@ -1,26 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.protocol.amqp.converter;
|
||||
|
||||
public class JMSConstants {
|
||||
public static final int DeliveryMode_NON_PERSISTENT = 1;
|
||||
public static final int DeliveryMode_PERSISTENT = 2;
|
||||
|
||||
public static final long MESSAGE_DEFAULT_TIME_TO_LIVE = 0;
|
||||
|
||||
public static final int MESSAGE_DEFAULT_PRIORITY = 4;
|
||||
}
|
|
@ -16,6 +16,9 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.protocol.amqp.converter.jms;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
|
||||
import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset;
|
||||
|
@ -43,116 +46,142 @@ import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteObje
|
|||
import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteShort;
|
||||
import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteUTF;
|
||||
|
||||
public class ServerJMSBytesMessage extends ServerJMSMessage {
|
||||
public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMessage {
|
||||
|
||||
public ServerJMSBytesMessage(ICoreMessage message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public long getBodyLength() throws Exception {
|
||||
@Override
|
||||
public long getBodyLength() throws JMSException {
|
||||
return message.getBodyBufferSize();
|
||||
}
|
||||
|
||||
public boolean readBoolean() throws Exception {
|
||||
@Override
|
||||
public boolean readBoolean() throws JMSException {
|
||||
return bytesReadBoolean(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
public byte readByte() throws Exception {
|
||||
@Override
|
||||
public byte readByte() throws JMSException {
|
||||
return bytesReadByte(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
public int readUnsignedByte() throws Exception {
|
||||
@Override
|
||||
public int readUnsignedByte() throws JMSException {
|
||||
return bytesReadUnsignedByte(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
public short readShort() throws Exception {
|
||||
@Override
|
||||
public short readShort() throws JMSException {
|
||||
return bytesReadShort(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
public int readUnsignedShort() throws Exception {
|
||||
@Override
|
||||
public int readUnsignedShort() throws JMSException {
|
||||
return bytesReadUnsignedShort(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
public char readChar() throws Exception {
|
||||
@Override
|
||||
public char readChar() throws JMSException {
|
||||
return bytesReadChar(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
public int readInt() throws Exception {
|
||||
@Override
|
||||
public int readInt() throws JMSException {
|
||||
return bytesReadInt(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
public long readLong() throws Exception {
|
||||
@Override
|
||||
public long readLong() throws JMSException {
|
||||
return bytesReadLong(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
public float readFloat() throws Exception {
|
||||
@Override
|
||||
public float readFloat() throws JMSException {
|
||||
return bytesReadFloat(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
public double readDouble() throws Exception {
|
||||
@Override
|
||||
public double readDouble() throws JMSException {
|
||||
return bytesReadDouble(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
public String readUTF() throws Exception {
|
||||
@Override
|
||||
public String readUTF() throws JMSException {
|
||||
return bytesReadUTF(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
public int readBytes(byte[] value) throws Exception {
|
||||
@Override
|
||||
public int readBytes(byte[] value) throws JMSException {
|
||||
return bytesReadBytes(getReadBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public int readBytes(byte[] value, int length) throws Exception {
|
||||
@Override
|
||||
public int readBytes(byte[] value, int length) throws JMSException {
|
||||
return bytesReadBytes(getReadBodyBuffer(), value, length);
|
||||
}
|
||||
|
||||
public void writeBoolean(boolean value) throws Exception {
|
||||
@Override
|
||||
public void writeBoolean(boolean value) throws JMSException {
|
||||
bytesWriteBoolean(getWriteBodyBuffer(), value);
|
||||
|
||||
}
|
||||
|
||||
public void writeByte(byte value) throws Exception {
|
||||
@Override
|
||||
public void writeByte(byte value) throws JMSException {
|
||||
bytesWriteByte(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeShort(short value) throws Exception {
|
||||
@Override
|
||||
public void writeShort(short value) throws JMSException {
|
||||
bytesWriteShort(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeChar(char value) throws Exception {
|
||||
@Override
|
||||
public void writeChar(char value) throws JMSException {
|
||||
bytesWriteChar(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeInt(int value) throws Exception {
|
||||
@Override
|
||||
public void writeInt(int value) throws JMSException {
|
||||
bytesWriteInt(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeLong(long value) throws Exception {
|
||||
@Override
|
||||
public void writeLong(long value) throws JMSException {
|
||||
bytesWriteLong(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeFloat(float value) throws Exception {
|
||||
@Override
|
||||
public void writeFloat(float value) throws JMSException {
|
||||
bytesWriteFloat(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeDouble(double value) throws Exception {
|
||||
@Override
|
||||
public void writeDouble(double value) throws JMSException {
|
||||
bytesWriteDouble(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeUTF(String value) throws Exception {
|
||||
@Override
|
||||
public void writeUTF(String value) throws JMSException {
|
||||
bytesWriteUTF(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeBytes(byte[] value) throws Exception {
|
||||
@Override
|
||||
public void writeBytes(byte[] value) throws JMSException {
|
||||
bytesWriteBytes(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeBytes(byte[] value, int offset, int length) throws Exception {
|
||||
@Override
|
||||
public void writeBytes(byte[] value, int offset, int length) throws JMSException {
|
||||
bytesWriteBytes(getWriteBodyBuffer(), value, offset, length);
|
||||
}
|
||||
|
||||
public void writeObject(Object value) throws Exception {
|
||||
@Override
|
||||
public void writeObject(Object value) throws JMSException {
|
||||
if (!bytesWriteObject(getWriteBodyBuffer(), value)) {
|
||||
throw new Exception("Can't make conversion of " + value + " to any known type");
|
||||
throw new JMSException("Can't make conversion of " + value + " to any known type");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -169,7 +198,8 @@ public class ServerJMSBytesMessage extends ServerJMSMessage {
|
|||
|
||||
}
|
||||
|
||||
public void reset() throws Exception {
|
||||
@Override
|
||||
public void reset() throws JMSException {
|
||||
if (!message.isLargeMessage()) {
|
||||
bytesMessageReset(getReadBodyBuffer());
|
||||
bytesMessageReset(getWriteBodyBuffer());
|
||||
|
|
|
@ -16,7 +16,9 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.protocol.amqp.converter.jms;
|
||||
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.MessageFormatException;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
|
||||
|
@ -36,7 +38,7 @@ import static org.apache.activemq.artemis.reader.MapMessageUtil.writeBodyMap;
|
|||
/**
|
||||
* ActiveMQ Artemis implementation of a JMS MapMessage.
|
||||
*/
|
||||
public final class ServerJMSMapMessage extends ServerJMSMessage {
|
||||
public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMessage {
|
||||
// Constants -----------------------------------------------------
|
||||
|
||||
public static final byte TYPE = Message.MAP_TYPE;
|
||||
|
@ -59,56 +61,68 @@ public final class ServerJMSMapMessage extends ServerJMSMessage {
|
|||
|
||||
// MapMessage implementation -------------------------------------
|
||||
|
||||
public void setBoolean(final String name, final boolean value) throws Exception {
|
||||
@Override
|
||||
public void setBoolean(final String name, final boolean value) throws JMSException {
|
||||
map.putBooleanProperty(new SimpleString(name), value);
|
||||
}
|
||||
|
||||
public void setByte(final String name, final byte value) throws Exception {
|
||||
@Override
|
||||
public void setByte(final String name, final byte value) throws JMSException {
|
||||
map.putByteProperty(new SimpleString(name), value);
|
||||
}
|
||||
|
||||
public void setShort(final String name, final short value) throws Exception {
|
||||
@Override
|
||||
public void setShort(final String name, final short value) throws JMSException {
|
||||
map.putShortProperty(new SimpleString(name), value);
|
||||
}
|
||||
|
||||
public void setChar(final String name, final char value) throws Exception {
|
||||
@Override
|
||||
public void setChar(final String name, final char value) throws JMSException {
|
||||
map.putCharProperty(new SimpleString(name), value);
|
||||
}
|
||||
|
||||
public void setInt(final String name, final int value) throws Exception {
|
||||
@Override
|
||||
public void setInt(final String name, final int value) throws JMSException {
|
||||
map.putIntProperty(new SimpleString(name), value);
|
||||
}
|
||||
|
||||
public void setLong(final String name, final long value) throws Exception {
|
||||
@Override
|
||||
public void setLong(final String name, final long value) throws JMSException {
|
||||
map.putLongProperty(new SimpleString(name), value);
|
||||
}
|
||||
|
||||
public void setFloat(final String name, final float value) throws Exception {
|
||||
@Override
|
||||
public void setFloat(final String name, final float value) throws JMSException {
|
||||
map.putFloatProperty(new SimpleString(name), value);
|
||||
}
|
||||
|
||||
public void setDouble(final String name, final double value) throws Exception {
|
||||
@Override
|
||||
public void setDouble(final String name, final double value) throws JMSException {
|
||||
map.putDoubleProperty(new SimpleString(name), value);
|
||||
}
|
||||
|
||||
public void setString(final String name, final String value) throws Exception {
|
||||
@Override
|
||||
public void setString(final String name, final String value) throws JMSException {
|
||||
map.putSimpleStringProperty(new SimpleString(name), value == null ? null : new SimpleString(value));
|
||||
}
|
||||
|
||||
public void setBytes(final String name, final byte[] value) throws Exception {
|
||||
@Override
|
||||
public void setBytes(final String name, final byte[] value) throws JMSException {
|
||||
map.putBytesProperty(new SimpleString(name), value);
|
||||
}
|
||||
|
||||
public void setBytes(final String name, final byte[] value, final int offset, final int length) throws Exception {
|
||||
@Override
|
||||
public void setBytes(final String name, final byte[] value, final int offset, final int length) throws JMSException {
|
||||
if (offset + length > value.length) {
|
||||
throw new Exception("Invalid offset/length");
|
||||
throw new JMSException("Invalid offset/length");
|
||||
}
|
||||
byte[] newBytes = new byte[length];
|
||||
System.arraycopy(value, offset, newBytes, 0, length);
|
||||
map.putBytesProperty(new SimpleString(name), newBytes);
|
||||
}
|
||||
|
||||
public void setObject(final String name, final Object value) throws Exception {
|
||||
@Override
|
||||
public void setObject(final String name, final Object value) throws JMSException {
|
||||
try {
|
||||
// primitives and String
|
||||
Object val = value;
|
||||
|
@ -123,75 +137,84 @@ public final class ServerJMSMapMessage extends ServerJMSMessage {
|
|||
}
|
||||
TypedProperties.setObjectProperty(new SimpleString(name), val, map);
|
||||
} catch (ActiveMQPropertyConversionException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public boolean getBoolean(final String name) throws Exception {
|
||||
@Override
|
||||
public boolean getBoolean(final String name) throws JMSException {
|
||||
try {
|
||||
return map.getBooleanProperty(new SimpleString(name));
|
||||
} catch (ActiveMQPropertyConversionException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public byte getByte(final String name) throws Exception {
|
||||
@Override
|
||||
public byte getByte(final String name) throws JMSException {
|
||||
try {
|
||||
return map.getByteProperty(new SimpleString(name));
|
||||
} catch (ActiveMQPropertyConversionException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public short getShort(final String name) throws Exception {
|
||||
@Override
|
||||
public short getShort(final String name) throws JMSException {
|
||||
try {
|
||||
return map.getShortProperty(new SimpleString(name));
|
||||
} catch (ActiveMQPropertyConversionException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public char getChar(final String name) throws Exception {
|
||||
@Override
|
||||
public char getChar(final String name) throws JMSException {
|
||||
try {
|
||||
return map.getCharProperty(new SimpleString(name));
|
||||
} catch (ActiveMQPropertyConversionException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public int getInt(final String name) throws Exception {
|
||||
@Override
|
||||
public int getInt(final String name) throws JMSException {
|
||||
try {
|
||||
return map.getIntProperty(new SimpleString(name));
|
||||
} catch (ActiveMQPropertyConversionException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public long getLong(final String name) throws Exception {
|
||||
@Override
|
||||
public long getLong(final String name) throws JMSException {
|
||||
try {
|
||||
return map.getLongProperty(new SimpleString(name));
|
||||
} catch (ActiveMQPropertyConversionException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public float getFloat(final String name) throws Exception {
|
||||
@Override
|
||||
public float getFloat(final String name) throws JMSException {
|
||||
try {
|
||||
return map.getFloatProperty(new SimpleString(name));
|
||||
} catch (ActiveMQPropertyConversionException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public double getDouble(final String name) throws Exception {
|
||||
@Override
|
||||
public double getDouble(final String name) throws JMSException {
|
||||
try {
|
||||
return map.getDoubleProperty(new SimpleString(name));
|
||||
} catch (ActiveMQPropertyConversionException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public String getString(final String name) throws Exception {
|
||||
@Override
|
||||
public String getString(final String name) throws JMSException {
|
||||
try {
|
||||
SimpleString str = map.getSimpleStringProperty(new SimpleString(name));
|
||||
if (str == null) {
|
||||
|
@ -200,19 +223,21 @@ public final class ServerJMSMapMessage extends ServerJMSMessage {
|
|||
return str.toString();
|
||||
}
|
||||
} catch (ActiveMQPropertyConversionException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public byte[] getBytes(final String name) throws Exception {
|
||||
@Override
|
||||
public byte[] getBytes(final String name) throws JMSException {
|
||||
try {
|
||||
return map.getBytesProperty(new SimpleString(name));
|
||||
} catch (ActiveMQPropertyConversionException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public Object getObject(final String name) throws Exception {
|
||||
@Override
|
||||
public Object getObject(final String name) throws JMSException {
|
||||
Object val = map.getProperty(new SimpleString(name));
|
||||
|
||||
if (val instanceof SimpleString) {
|
||||
|
@ -222,16 +247,18 @@ public final class ServerJMSMapMessage extends ServerJMSMessage {
|
|||
return val;
|
||||
}
|
||||
|
||||
public Enumeration getMapNames() throws Exception {
|
||||
@Override
|
||||
public Enumeration getMapNames() throws JMSException {
|
||||
return Collections.enumeration(map.getMapNames());
|
||||
}
|
||||
|
||||
public boolean itemExists(final String name) throws Exception {
|
||||
@Override
|
||||
public boolean itemExists(final String name) throws JMSException {
|
||||
return map.containsProperty(new SimpleString(name));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearBody() throws Exception {
|
||||
public void clearBody() throws JMSException {
|
||||
super.clearBody();
|
||||
|
||||
map.clear();
|
||||
|
|
|
@ -16,13 +16,17 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.protocol.amqp.converter.jms;
|
||||
|
||||
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
import org.apache.activemq.artemis.reader.MessageUtil;
|
||||
|
||||
import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;
|
||||
|
@ -32,10 +36,7 @@ import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE;
|
|||
import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE;
|
||||
import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
|
||||
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.JMSConstants.DeliveryMode_NON_PERSISTENT;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.JMSConstants.DeliveryMode_PERSISTENT;
|
||||
|
||||
public class ServerJMSMessage {
|
||||
public class ServerJMSMessage implements Message {
|
||||
|
||||
protected final ICoreMessage message;
|
||||
private ActiveMQBuffer readBodyBuffer;
|
||||
|
@ -67,7 +68,6 @@ public class ServerJMSMessage {
|
|||
|
||||
/**
|
||||
* When reading we use a protected copy so multi-threads can work fine
|
||||
* @return
|
||||
*/
|
||||
protected ActiveMQBuffer getReadBodyBuffer() {
|
||||
if (readBodyBuffer == null) {
|
||||
|
@ -79,46 +79,52 @@ public class ServerJMSMessage {
|
|||
|
||||
/**
|
||||
* When writing on the conversion we use the buffer directly
|
||||
* @return
|
||||
*/
|
||||
protected ActiveMQBuffer getWriteBodyBuffer() {
|
||||
readBodyBuffer = null; // it invalidates this buffer if anything is written
|
||||
return message.getBodyBuffer();
|
||||
}
|
||||
|
||||
public final String getJMSMessageID() throws Exception {
|
||||
@Override
|
||||
public final String getJMSMessageID() throws JMSException {
|
||||
if (message.containsProperty(NATIVE_MESSAGE_ID)) {
|
||||
return getStringProperty(NATIVE_MESSAGE_ID);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public final void setJMSMessageID(String id) throws Exception {
|
||||
@Override
|
||||
public final void setJMSMessageID(String id) throws JMSException {
|
||||
if (id != null) {
|
||||
message.putStringProperty(NATIVE_MESSAGE_ID, id);
|
||||
}
|
||||
}
|
||||
|
||||
public final long getJMSTimestamp() throws Exception {
|
||||
@Override
|
||||
public final long getJMSTimestamp() throws JMSException {
|
||||
return message.getTimestamp();
|
||||
}
|
||||
|
||||
public final void setJMSTimestamp(long timestamp) throws Exception {
|
||||
@Override
|
||||
public final void setJMSTimestamp(long timestamp) throws JMSException {
|
||||
message.setTimestamp(timestamp);
|
||||
}
|
||||
|
||||
public final byte[] getJMSCorrelationIDAsBytes() throws Exception {
|
||||
@Override
|
||||
public final byte[] getJMSCorrelationIDAsBytes() throws JMSException {
|
||||
return MessageUtil.getJMSCorrelationIDAsBytes(message);
|
||||
}
|
||||
|
||||
public final void setJMSCorrelationIDAsBytes(byte[] correlationID) throws Exception {
|
||||
@Override
|
||||
public final void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException {
|
||||
if (correlationID == null || correlationID.length == 0) {
|
||||
throw new Exception("Please specify a non-zero length byte[]");
|
||||
throw new JMSException("Please specify a non-zero length byte[]");
|
||||
}
|
||||
message.setCorrelationID(correlationID);
|
||||
}
|
||||
|
||||
public final String getJMSCorrelationID() throws Exception {
|
||||
@Override
|
||||
public final String getJMSCorrelationID() throws JMSException {
|
||||
|
||||
Object correlationID = message.getCorrelationID();
|
||||
if (correlationID instanceof String) {
|
||||
|
@ -131,178 +137,226 @@ public class ServerJMSMessage {
|
|||
}
|
||||
}
|
||||
|
||||
public final void setJMSCorrelationID(String correlationID) throws Exception {
|
||||
@Override
|
||||
public final void setJMSCorrelationID(String correlationID) throws JMSException {
|
||||
message.setCorrelationID(correlationID);
|
||||
}
|
||||
|
||||
public final SimpleString getJMSReplyTo() throws Exception {
|
||||
return MessageUtil.getJMSReplyTo(message);
|
||||
@Override
|
||||
public final Destination getJMSReplyTo() throws JMSException {
|
||||
SimpleString reply = MessageUtil.getJMSReplyTo(message);
|
||||
if (reply != null) {
|
||||
return ActiveMQDestination.fromPrefixedName(reply.toString());
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public final void setJMSReplyTo(String replyTo) throws Exception {
|
||||
MessageUtil.setJMSReplyTo(message, SimpleString.toSimpleString(replyTo));
|
||||
@Override
|
||||
public final void setJMSReplyTo(Destination replyTo) throws JMSException {
|
||||
MessageUtil.setJMSReplyTo(message, replyTo == null ? null : ((ActiveMQDestination) replyTo).getSimpleAddress());
|
||||
}
|
||||
|
||||
public SimpleString getJMSDestination() throws Exception {
|
||||
return message.getAddressSimpleString();
|
||||
@Override
|
||||
public Destination getJMSDestination() throws JMSException {
|
||||
return ActiveMQDestination.createDestination(message.getRoutingType(), message.getAddressSimpleString());
|
||||
}
|
||||
|
||||
public final void setJMSDestination(String destination) throws Exception {
|
||||
message.setAddress(destination);
|
||||
@Override
|
||||
public final void setJMSDestination(Destination destination) throws JMSException {
|
||||
if (destination == null) {
|
||||
message.setAddress((SimpleString) null);
|
||||
} else {
|
||||
message.setAddress(((ActiveMQDestination) destination).getSimpleAddress());
|
||||
}
|
||||
|
||||
public final int getJMSDeliveryMode() throws Exception {
|
||||
return message.isDurable() ? DeliveryMode_PERSISTENT : DeliveryMode_NON_PERSISTENT;
|
||||
}
|
||||
|
||||
public final void setJMSDeliveryMode(int deliveryMode) throws Exception {
|
||||
switch (deliveryMode) {
|
||||
case DeliveryMode_PERSISTENT:
|
||||
@Override
|
||||
public final int getJMSDeliveryMode() throws JMSException {
|
||||
return message.isDurable() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void setJMSDeliveryMode(int deliveryMode) throws JMSException {
|
||||
if (deliveryMode == DeliveryMode.PERSISTENT) {
|
||||
message.setDurable(true);
|
||||
break;
|
||||
case DeliveryMode_NON_PERSISTENT:
|
||||
} else if (deliveryMode == DeliveryMode.NON_PERSISTENT) {
|
||||
message.setDurable(false);
|
||||
break;
|
||||
default:
|
||||
throw new Exception("Invalid mode " + deliveryMode);
|
||||
} else {
|
||||
throw new JMSException("Invalid mode " + deliveryMode);
|
||||
}
|
||||
}
|
||||
|
||||
public final boolean getJMSRedelivered() throws Exception {
|
||||
@Override
|
||||
public final boolean getJMSRedelivered() throws JMSException {
|
||||
return false;
|
||||
}
|
||||
|
||||
public final void setJMSRedelivered(boolean redelivered) throws Exception {
|
||||
@Override
|
||||
public final void setJMSRedelivered(boolean redelivered) throws JMSException {
|
||||
// no op
|
||||
}
|
||||
|
||||
public final String getJMSType() throws Exception {
|
||||
@Override
|
||||
public final String getJMSType() throws JMSException {
|
||||
return MessageUtil.getJMSType(message);
|
||||
}
|
||||
|
||||
public final void setJMSType(String type) throws Exception {
|
||||
@Override
|
||||
public final void setJMSType(String type) throws JMSException {
|
||||
MessageUtil.setJMSType(message, type);
|
||||
}
|
||||
|
||||
public final long getJMSExpiration() throws Exception {
|
||||
@Override
|
||||
public final long getJMSExpiration() throws JMSException {
|
||||
return message.getExpiration();
|
||||
}
|
||||
|
||||
public final void setJMSExpiration(long expiration) throws Exception {
|
||||
@Override
|
||||
public final void setJMSExpiration(long expiration) throws JMSException {
|
||||
message.setExpiration(expiration);
|
||||
}
|
||||
|
||||
public final long getJMSDeliveryTime() throws Exception {
|
||||
@Override
|
||||
public final long getJMSDeliveryTime() throws JMSException {
|
||||
// no op
|
||||
return 0;
|
||||
}
|
||||
|
||||
public final void setJMSDeliveryTime(long deliveryTime) throws Exception {
|
||||
@Override
|
||||
public final void setJMSDeliveryTime(long deliveryTime) throws JMSException {
|
||||
// no op
|
||||
}
|
||||
|
||||
public final int getJMSPriority() throws Exception {
|
||||
@Override
|
||||
public final int getJMSPriority() throws JMSException {
|
||||
return message.getPriority();
|
||||
}
|
||||
|
||||
public final void setJMSPriority(int priority) throws Exception {
|
||||
@Override
|
||||
public final void setJMSPriority(int priority) throws JMSException {
|
||||
message.setPriority((byte) priority);
|
||||
}
|
||||
|
||||
public final void clearProperties() throws Exception {
|
||||
@Override
|
||||
public final void clearProperties() throws JMSException {
|
||||
MessageUtil.clearProperties(message);
|
||||
|
||||
}
|
||||
|
||||
public final boolean propertyExists(String name) throws Exception {
|
||||
@Override
|
||||
public final boolean propertyExists(String name) throws JMSException {
|
||||
return MessageUtil.propertyExists(message, name);
|
||||
}
|
||||
|
||||
public final boolean getBooleanProperty(String name) throws Exception {
|
||||
@Override
|
||||
public final boolean getBooleanProperty(String name) throws JMSException {
|
||||
return message.getBooleanProperty(name);
|
||||
}
|
||||
|
||||
public final byte getByteProperty(String name) throws Exception {
|
||||
@Override
|
||||
public final byte getByteProperty(String name) throws JMSException {
|
||||
return message.getByteProperty(name);
|
||||
}
|
||||
|
||||
public final short getShortProperty(String name) throws Exception {
|
||||
@Override
|
||||
public final short getShortProperty(String name) throws JMSException {
|
||||
return message.getShortProperty(name);
|
||||
}
|
||||
|
||||
public final int getIntProperty(String name) throws Exception {
|
||||
@Override
|
||||
public final int getIntProperty(String name) throws JMSException {
|
||||
return MessageUtil.getIntProperty(message, name);
|
||||
}
|
||||
|
||||
public final long getLongProperty(String name) throws Exception {
|
||||
@Override
|
||||
public final long getLongProperty(String name) throws JMSException {
|
||||
return MessageUtil.getLongProperty(message, name);
|
||||
}
|
||||
|
||||
public final float getFloatProperty(String name) throws Exception {
|
||||
@Override
|
||||
public final float getFloatProperty(String name) throws JMSException {
|
||||
return message.getFloatProperty(name);
|
||||
}
|
||||
|
||||
public final double getDoubleProperty(String name) throws Exception {
|
||||
@Override
|
||||
public final double getDoubleProperty(String name) throws JMSException {
|
||||
return message.getDoubleProperty(name);
|
||||
}
|
||||
|
||||
public final String getStringProperty(String name) throws Exception {
|
||||
@Override
|
||||
public final String getStringProperty(String name) throws JMSException {
|
||||
return MessageUtil.getStringProperty(message, name);
|
||||
}
|
||||
|
||||
public final Object getObjectProperty(String name) throws Exception {
|
||||
@Override
|
||||
public final Object getObjectProperty(String name) throws JMSException {
|
||||
return MessageUtil.getObjectProperty(message, name);
|
||||
}
|
||||
|
||||
public final Enumeration getPropertyNames() throws Exception {
|
||||
@Override
|
||||
public final Enumeration getPropertyNames() throws JMSException {
|
||||
return Collections.enumeration(MessageUtil.getPropertyNames(message));
|
||||
}
|
||||
|
||||
public final void setBooleanProperty(String name, boolean value) throws Exception {
|
||||
@Override
|
||||
public final void setBooleanProperty(String name, boolean value) throws JMSException {
|
||||
message.putBooleanProperty(name, value);
|
||||
}
|
||||
|
||||
public final void setByteProperty(String name, byte value) throws Exception {
|
||||
@Override
|
||||
public final void setByteProperty(String name, byte value) throws JMSException {
|
||||
message.putByteProperty(name, value);
|
||||
}
|
||||
|
||||
public final void setShortProperty(String name, short value) throws Exception {
|
||||
@Override
|
||||
public final void setShortProperty(String name, short value) throws JMSException {
|
||||
message.putShortProperty(name, value);
|
||||
}
|
||||
|
||||
public final void setIntProperty(String name, int value) throws Exception {
|
||||
@Override
|
||||
public final void setIntProperty(String name, int value) throws JMSException {
|
||||
MessageUtil.setIntProperty(message, name, value);
|
||||
}
|
||||
|
||||
public final void setLongProperty(String name, long value) throws Exception {
|
||||
@Override
|
||||
public final void setLongProperty(String name, long value) throws JMSException {
|
||||
MessageUtil.setLongProperty(message, name, value);
|
||||
}
|
||||
|
||||
public final void setFloatProperty(String name, float value) throws Exception {
|
||||
@Override
|
||||
public final void setFloatProperty(String name, float value) throws JMSException {
|
||||
message.putFloatProperty(name, value);
|
||||
}
|
||||
|
||||
public final void setDoubleProperty(String name, double value) throws Exception {
|
||||
@Override
|
||||
public final void setDoubleProperty(String name, double value) throws JMSException {
|
||||
message.putDoubleProperty(name, value);
|
||||
}
|
||||
|
||||
public final void setStringProperty(String name, String value) throws Exception {
|
||||
@Override
|
||||
public final void setStringProperty(String name, String value) throws JMSException {
|
||||
MessageUtil.setStringProperty(message, name, value);
|
||||
}
|
||||
|
||||
public final void setObjectProperty(String name, Object value) throws Exception {
|
||||
@Override
|
||||
public final void setObjectProperty(String name, Object value) throws JMSException {
|
||||
MessageUtil.setObjectProperty(message, name, value);
|
||||
}
|
||||
|
||||
public final void acknowledge() throws Exception {
|
||||
@Override
|
||||
public final void acknowledge() throws JMSException {
|
||||
// no op
|
||||
}
|
||||
|
||||
public void clearBody() throws Exception {
|
||||
@Override
|
||||
public void clearBody() throws JMSException {
|
||||
message.getBodyBuffer().clear();
|
||||
}
|
||||
|
||||
public final <T> T getBody(Class<T> c) throws Exception {
|
||||
@Override
|
||||
public final <T> T getBody(Class<T> c) throws JMSException {
|
||||
// no op.. jms2 not used on the conversion
|
||||
return null;
|
||||
}
|
||||
|
@ -322,7 +376,8 @@ public class ServerJMSMessage {
|
|||
}
|
||||
}
|
||||
|
||||
public final boolean isBodyAssignableTo(Class c) throws Exception {
|
||||
@Override
|
||||
public final boolean isBodyAssignableTo(Class c) throws JMSException {
|
||||
// no op.. jms2 not used on the conversion
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -18,12 +18,15 @@ package org.apache.activemq.artemis.protocol.amqp.converter.jms;
|
|||
|
||||
import java.io.Serializable;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.ObjectMessage;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
|
||||
public class ServerJMSObjectMessage extends ServerJMSMessage {
|
||||
public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMessage {
|
||||
|
||||
public static final byte TYPE = Message.OBJECT_TYPE;
|
||||
|
||||
|
@ -33,11 +36,13 @@ public class ServerJMSObjectMessage extends ServerJMSMessage {
|
|||
super(message);
|
||||
}
|
||||
|
||||
public void setObject(Serializable object) throws Exception {
|
||||
@Override
|
||||
public void setObject(Serializable object) throws JMSException {
|
||||
throw new UnsupportedOperationException("Cannot set Object on this internal message");
|
||||
}
|
||||
|
||||
public Serializable getObject() throws Exception {
|
||||
@Override
|
||||
public Serializable getObject() throws JMSException {
|
||||
throw new UnsupportedOperationException("Cannot set Object on this internal message");
|
||||
}
|
||||
|
||||
|
|
|
@ -16,6 +16,11 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.protocol.amqp.converter.jms;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageEOFException;
|
||||
import javax.jms.MessageFormatException;
|
||||
import javax.jms.StreamMessage;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
|
@ -33,7 +38,7 @@ import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadObj
|
|||
import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadShort;
|
||||
import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadString;
|
||||
|
||||
public final class ServerJMSStreamMessage extends ServerJMSMessage {
|
||||
public final class ServerJMSStreamMessage extends ServerJMSMessage implements StreamMessage {
|
||||
|
||||
public static final byte TYPE = Message.STREAM_TYPE;
|
||||
|
||||
|
@ -45,101 +50,110 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage {
|
|||
|
||||
// StreamMessage implementation ----------------------------------
|
||||
|
||||
public boolean readBoolean() throws Exception {
|
||||
@Override
|
||||
public boolean readBoolean() throws JMSException {
|
||||
|
||||
try {
|
||||
return streamReadBoolean(getReadBodyBuffer());
|
||||
} catch (IllegalStateException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
throw new RuntimeException("");
|
||||
throw new MessageEOFException("");
|
||||
}
|
||||
}
|
||||
|
||||
public byte readByte() throws Exception {
|
||||
@Override
|
||||
public byte readByte() throws JMSException {
|
||||
try {
|
||||
return streamReadByte(getReadBodyBuffer());
|
||||
} catch (IllegalStateException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
throw new RuntimeException("");
|
||||
throw new MessageEOFException("");
|
||||
}
|
||||
}
|
||||
|
||||
public short readShort() throws Exception {
|
||||
@Override
|
||||
public short readShort() throws JMSException {
|
||||
|
||||
try {
|
||||
return streamReadShort(getReadBodyBuffer());
|
||||
} catch (IllegalStateException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
throw new RuntimeException("");
|
||||
throw new MessageEOFException("");
|
||||
}
|
||||
}
|
||||
|
||||
public char readChar() throws Exception {
|
||||
@Override
|
||||
public char readChar() throws JMSException {
|
||||
|
||||
try {
|
||||
return streamReadChar(getReadBodyBuffer());
|
||||
} catch (IllegalStateException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
throw new RuntimeException("");
|
||||
throw new MessageEOFException("");
|
||||
}
|
||||
}
|
||||
|
||||
public int readInt() throws Exception {
|
||||
@Override
|
||||
public int readInt() throws JMSException {
|
||||
|
||||
try {
|
||||
return streamReadInteger(getReadBodyBuffer());
|
||||
} catch (IllegalStateException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
throw new RuntimeException("");
|
||||
throw new MessageEOFException("");
|
||||
}
|
||||
}
|
||||
|
||||
public long readLong() throws Exception {
|
||||
@Override
|
||||
public long readLong() throws JMSException {
|
||||
|
||||
try {
|
||||
return streamReadLong(getReadBodyBuffer());
|
||||
} catch (IllegalStateException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
throw new RuntimeException("");
|
||||
throw new MessageEOFException("");
|
||||
}
|
||||
}
|
||||
|
||||
public float readFloat() throws Exception {
|
||||
@Override
|
||||
public float readFloat() throws JMSException {
|
||||
|
||||
try {
|
||||
return streamReadFloat(getReadBodyBuffer());
|
||||
} catch (IllegalStateException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
throw new RuntimeException("");
|
||||
throw new MessageEOFException("");
|
||||
}
|
||||
}
|
||||
|
||||
public double readDouble() throws Exception {
|
||||
@Override
|
||||
public double readDouble() throws JMSException {
|
||||
|
||||
try {
|
||||
return streamReadDouble(getReadBodyBuffer());
|
||||
} catch (IllegalStateException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
throw new RuntimeException("");
|
||||
throw new MessageEOFException("");
|
||||
}
|
||||
}
|
||||
|
||||
public String readString() throws Exception {
|
||||
@Override
|
||||
public String readString() throws JMSException {
|
||||
|
||||
try {
|
||||
return streamReadString(getReadBodyBuffer());
|
||||
} catch (IllegalStateException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
throw new RuntimeException("");
|
||||
throw new MessageEOFException("");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -148,7 +162,8 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage {
|
|||
*/
|
||||
private int len = 0;
|
||||
|
||||
public int readBytes(final byte[] value) throws Exception {
|
||||
@Override
|
||||
public int readBytes(final byte[] value) throws JMSException {
|
||||
|
||||
try {
|
||||
Pair<Integer, Integer> pairRead = streamReadBytes(getReadBodyBuffer(), len, value);
|
||||
|
@ -156,95 +171,108 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage {
|
|||
len = pairRead.getA();
|
||||
return pairRead.getB();
|
||||
} catch (IllegalStateException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
throw new RuntimeException("");
|
||||
throw new MessageEOFException("");
|
||||
}
|
||||
}
|
||||
|
||||
public Object readObject() throws Exception {
|
||||
@Override
|
||||
public Object readObject() throws JMSException {
|
||||
|
||||
if (getReadBodyBuffer().readerIndex() >= getReadBodyBuffer().writerIndex()) {
|
||||
throw new RuntimeException("");
|
||||
throw new MessageEOFException("");
|
||||
}
|
||||
try {
|
||||
return streamReadObject(getReadBodyBuffer());
|
||||
} catch (IllegalStateException e) {
|
||||
throw new RuntimeException(e.getMessage());
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
throw new RuntimeException("");
|
||||
throw new MessageEOFException("");
|
||||
}
|
||||
}
|
||||
|
||||
public void writeBoolean(final boolean value) throws Exception {
|
||||
@Override
|
||||
public void writeBoolean(final boolean value) throws JMSException {
|
||||
|
||||
getWriteBodyBuffer().writeByte(DataConstants.BOOLEAN);
|
||||
getWriteBodyBuffer().writeBoolean(value);
|
||||
}
|
||||
|
||||
public void writeByte(final byte value) throws Exception {
|
||||
@Override
|
||||
public void writeByte(final byte value) throws JMSException {
|
||||
|
||||
getWriteBodyBuffer().writeByte(DataConstants.BYTE);
|
||||
getWriteBodyBuffer().writeByte(value);
|
||||
}
|
||||
|
||||
public void writeShort(final short value) throws Exception {
|
||||
@Override
|
||||
public void writeShort(final short value) throws JMSException {
|
||||
|
||||
getWriteBodyBuffer().writeByte(DataConstants.SHORT);
|
||||
getWriteBodyBuffer().writeShort(value);
|
||||
}
|
||||
|
||||
public void writeChar(final char value) throws Exception {
|
||||
@Override
|
||||
public void writeChar(final char value) throws JMSException {
|
||||
|
||||
getWriteBodyBuffer().writeByte(DataConstants.CHAR);
|
||||
getWriteBodyBuffer().writeShort((short) value);
|
||||
}
|
||||
|
||||
public void writeInt(final int value) throws Exception {
|
||||
@Override
|
||||
public void writeInt(final int value) throws JMSException {
|
||||
|
||||
getWriteBodyBuffer().writeByte(DataConstants.INT);
|
||||
getWriteBodyBuffer().writeInt(value);
|
||||
}
|
||||
|
||||
public void writeLong(final long value) throws Exception {
|
||||
@Override
|
||||
public void writeLong(final long value) throws JMSException {
|
||||
|
||||
getWriteBodyBuffer().writeByte(DataConstants.LONG);
|
||||
getWriteBodyBuffer().writeLong(value);
|
||||
}
|
||||
|
||||
public void writeFloat(final float value) throws Exception {
|
||||
@Override
|
||||
public void writeFloat(final float value) throws JMSException {
|
||||
|
||||
getWriteBodyBuffer().writeByte(DataConstants.FLOAT);
|
||||
getWriteBodyBuffer().writeInt(Float.floatToIntBits(value));
|
||||
}
|
||||
|
||||
public void writeDouble(final double value) throws Exception {
|
||||
@Override
|
||||
public void writeDouble(final double value) throws JMSException {
|
||||
|
||||
getWriteBodyBuffer().writeByte(DataConstants.DOUBLE);
|
||||
getWriteBodyBuffer().writeLong(Double.doubleToLongBits(value));
|
||||
}
|
||||
|
||||
public void writeString(final String value) throws Exception {
|
||||
@Override
|
||||
public void writeString(final String value) throws JMSException {
|
||||
|
||||
getWriteBodyBuffer().writeByte(DataConstants.STRING);
|
||||
getWriteBodyBuffer().writeNullableString(value);
|
||||
}
|
||||
|
||||
public void writeBytes(final byte[] value) throws Exception {
|
||||
@Override
|
||||
public void writeBytes(final byte[] value) throws JMSException {
|
||||
|
||||
getWriteBodyBuffer().writeByte(DataConstants.BYTES);
|
||||
getWriteBodyBuffer().writeInt(value.length);
|
||||
getWriteBodyBuffer().writeBytes(value);
|
||||
}
|
||||
|
||||
public void writeBytes(final byte[] value, final int offset, final int length) throws Exception {
|
||||
@Override
|
||||
public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException {
|
||||
|
||||
getWriteBodyBuffer().writeByte(DataConstants.BYTES);
|
||||
getWriteBodyBuffer().writeInt(length);
|
||||
getWriteBodyBuffer().writeBytes(value, offset, length);
|
||||
}
|
||||
|
||||
public void writeObject(final Object value) throws Exception {
|
||||
@Override
|
||||
public void writeObject(final Object value) throws JMSException {
|
||||
if (value instanceof String) {
|
||||
writeString((String) value);
|
||||
} else if (value instanceof Boolean) {
|
||||
|
@ -268,18 +296,19 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage {
|
|||
} else if (value == null) {
|
||||
writeString(null);
|
||||
} else {
|
||||
throw new RuntimeException("Invalid object type: " + value.getClass());
|
||||
throw new MessageFormatException("Invalid object type: " + value.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
public void reset() throws Exception {
|
||||
@Override
|
||||
public void reset() throws JMSException {
|
||||
getWriteBodyBuffer().resetReaderIndex();
|
||||
}
|
||||
|
||||
// ActiveMQRAMessage overrides ----------------------------------------
|
||||
|
||||
@Override
|
||||
public void clearBody() throws Exception {
|
||||
public void clearBody() throws JMSException {
|
||||
super.clearBody();
|
||||
|
||||
getWriteBodyBuffer().clear();
|
||||
|
@ -292,7 +321,6 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage {
|
|||
|
||||
/**
|
||||
* Encode the body into the internal message
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@Override
|
||||
public void encode() throws Exception {
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.protocol.amqp.converter.jms;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -28,7 +31,7 @@ import static org.apache.activemq.artemis.reader.TextMessageUtil.writeBodyText;
|
|||
* <br>
|
||||
* This class was ported from SpyTextMessage in JBossMQ.
|
||||
*/
|
||||
public class ServerJMSTextMessage extends ServerJMSMessage {
|
||||
public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessage {
|
||||
// Constants -----------------------------------------------------
|
||||
|
||||
public static final byte TYPE = Message.TEXT_TYPE;
|
||||
|
@ -52,7 +55,8 @@ public class ServerJMSTextMessage extends ServerJMSMessage {
|
|||
}
|
||||
// TextMessage implementation ------------------------------------
|
||||
|
||||
public void setText(final String text) throws Exception {
|
||||
@Override
|
||||
public void setText(final String text) throws JMSException {
|
||||
if (text != null) {
|
||||
this.text = new SimpleString(text);
|
||||
} else {
|
||||
|
@ -62,6 +66,7 @@ public class ServerJMSTextMessage extends ServerJMSMessage {
|
|||
writeBodyText(getWriteBodyBuffer(), this.text);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getText() {
|
||||
if (text != null) {
|
||||
return text.toString();
|
||||
|
@ -71,7 +76,7 @@ public class ServerJMSTextMessage extends ServerJMSMessage {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void clearBody() throws Exception {
|
||||
public void clearBody() throws JMSException {
|
||||
super.clearBody();
|
||||
|
||||
text = null;
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
|
|||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
|
||||
|
@ -63,7 +64,6 @@ import org.apache.activemq.artemis.selector.filter.FilterException;
|
|||
import org.apache.activemq.artemis.selector.impl.SelectorParser;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||
import org.apache.activemq.artemis.utils.DestinationUtil;
|
||||
import org.apache.qpid.proton.amqp.DescribedType;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.messaging.Accepted;
|
||||
|
@ -842,7 +842,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
final boolean durable = !isVolatile;
|
||||
final String subscriptionName = pubId.contains("|") ? pubId.split("\\|")[0] : pubId;
|
||||
final String clientID = clientId == null || clientId.isEmpty() || global ? null : clientId;
|
||||
return DestinationUtil.createQueueNameForSubscription(durable, clientID, subscriptionName);
|
||||
return ActiveMQDestination.createQueueNameForSubscription(durable, clientID, subscriptionName);
|
||||
} else {
|
||||
String queue = clientId == null || clientId.isEmpty() || global ? pubId : clientId + "." + pubId;
|
||||
if (shared) {
|
||||
|
|
|
@ -120,7 +120,7 @@ public class TestConversions extends Assert {
|
|||
Assert.assertArrayEquals(bodyBytes, newBodyBytes);
|
||||
}
|
||||
|
||||
private void verifyProperties(ServerJMSMessage message) throws Exception {
|
||||
private void verifyProperties(javax.jms.Message message) throws Exception {
|
||||
assertEquals(true, message.getBooleanProperty("true"));
|
||||
assertEquals(false, message.getBooleanProperty("false"));
|
||||
assertEquals("bar", message.getStringProperty("foo"));
|
||||
|
|
|
@ -29,6 +29,12 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
|
||||
|
@ -43,17 +49,17 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMess
|
|||
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
|
||||
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
|
||||
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
||||
import org.apache.qpid.proton.amqp.messaging.Data;
|
||||
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
|
||||
import org.apache.qpid.proton.message.Message;
|
||||
import org.apache.qpid.proton.message.impl.MessageImpl;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.netty.buffer.Unpooled;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
|
||||
|
||||
public class JMSMappingInboundTransformerTest {
|
||||
|
||||
|
@ -79,7 +85,7 @@ public class JMSMappingInboundTransformerTest {
|
|||
|
||||
ICoreMessage coreMessage = messageEncode.toCore();
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(coreMessage);
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(coreMessage);
|
||||
|
||||
assertNotNull("Message should not be null", jmsMessage);
|
||||
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
|
||||
|
@ -96,7 +102,7 @@ public class JMSMappingInboundTransformerTest {
|
|||
public void testCreateBytesMessageFromNoBodySectionAndNoContentType() throws Exception {
|
||||
MessageImpl message = (MessageImpl) Message.Factory.create();
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
|
||||
assertNotNull("Message should not be null", jmsMessage);
|
||||
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
|
||||
|
@ -107,7 +113,7 @@ public class JMSMappingInboundTransformerTest {
|
|||
MessageImpl message = (MessageImpl) Message.Factory.create();
|
||||
message.setContentType("text/plain");
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
|
||||
assertNotNull("Message should not be null", jmsMessage);
|
||||
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
|
||||
|
@ -131,7 +137,7 @@ public class JMSMappingInboundTransformerTest {
|
|||
message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE);
|
||||
|
||||
AMQPStandardMessage amqp = encodeAndCreateAMQPMessage(message);
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(amqp.toCore());
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(amqp.toCore());
|
||||
|
||||
assertNotNull("Message should not be null", jmsMessage);
|
||||
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
|
||||
|
@ -152,7 +158,7 @@ public class JMSMappingInboundTransformerTest {
|
|||
message.setBody(new Data(binary));
|
||||
message.setContentType("unknown-content-type");
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
|
||||
assertNotNull("Message should not be null", jmsMessage);
|
||||
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
|
||||
|
@ -173,12 +179,20 @@ public class JMSMappingInboundTransformerTest {
|
|||
|
||||
assertNull(message.getContentType());
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
|
||||
assertNotNull("Message should not be null", jmsMessage);
|
||||
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that receiving a data body containing nothing, but with the content type set to
|
||||
* {@value AMQPMessageSupport#SERIALIZED_JAVA_OBJECT_CONTENT_TYPE} results in an
|
||||
* ObjectMessage when not otherwise annotated to indicate the type of JMS message it is.
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateObjectMessageFromDataWithContentTypeAndEmptyBinary() throws Exception {
|
||||
MessageImpl message = (MessageImpl) Message.Factory.create();
|
||||
|
@ -186,7 +200,7 @@ public class JMSMappingInboundTransformerTest {
|
|||
message.setBody(new Data(binary));
|
||||
message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
|
||||
assertNotNull("Message should not be null", jmsMessage);
|
||||
assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
|
||||
|
@ -286,7 +300,7 @@ public class JMSMappingInboundTransformerTest {
|
|||
message.setBody(new Data(binary));
|
||||
message.setContentType(contentType);
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
|
||||
assertNotNull("Message should not be null", jmsMessage);
|
||||
if (StandardCharsets.UTF_8.equals(expectedCharset)) {
|
||||
|
@ -310,7 +324,7 @@ public class JMSMappingInboundTransformerTest {
|
|||
MessageImpl message = (MessageImpl) Message.Factory.create();
|
||||
message.setBody(new AmqpValue("content"));
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
|
||||
assertNotNull("Message should not be null", jmsMessage);
|
||||
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
|
||||
|
@ -328,7 +342,7 @@ public class JMSMappingInboundTransformerTest {
|
|||
MessageImpl message = (MessageImpl) Message.Factory.create();
|
||||
message.setBody(new AmqpValue(null));
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
|
||||
assertNotNull("Message should not be null", jmsMessage);
|
||||
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
|
||||
|
@ -348,7 +362,7 @@ public class JMSMappingInboundTransformerTest {
|
|||
message.setBody(new AmqpValue(new Binary(new byte[0])));
|
||||
message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
|
||||
assertNotNull("Message should not be null", jmsMessage);
|
||||
assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
|
||||
|
@ -367,7 +381,7 @@ public class JMSMappingInboundTransformerTest {
|
|||
Map<String, String> map = new HashMap<>();
|
||||
message.setBody(new AmqpValue(map));
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
|
||||
assertNotNull("Message should not be null", jmsMessage);
|
||||
assertEquals("Unexpected message class type", ServerJMSMapMessage.class, jmsMessage.getClass());
|
||||
|
@ -386,7 +400,7 @@ public class JMSMappingInboundTransformerTest {
|
|||
List<String> list = new ArrayList<>();
|
||||
message.setBody(new AmqpValue(list));
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
|
||||
assertNotNull("Message should not be null", jmsMessage);
|
||||
assertEquals("Unexpected message class type", ServerJMSStreamMessage.class, jmsMessage.getClass());
|
||||
|
@ -405,7 +419,7 @@ public class JMSMappingInboundTransformerTest {
|
|||
List<String> list = new ArrayList<>();
|
||||
message.setBody(new AmqpSequence(list));
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
|
||||
assertNotNull("Message should not be null", jmsMessage);
|
||||
assertEquals("Unexpected message class type", ServerJMSStreamMessage.class, jmsMessage.getClass());
|
||||
|
@ -424,7 +438,7 @@ public class JMSMappingInboundTransformerTest {
|
|||
Binary binary = new Binary(new byte[0]);
|
||||
message.setBody(new AmqpValue(binary));
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
|
||||
assertNotNull("Message should not be null", jmsMessage);
|
||||
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
|
||||
|
@ -443,7 +457,7 @@ public class JMSMappingInboundTransformerTest {
|
|||
MessageImpl message = (MessageImpl) Message.Factory.create();
|
||||
message.setBody(new AmqpValue(UUID.randomUUID()));
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
|
||||
assertNotNull("Message should not be null", jmsMessage);
|
||||
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
|
||||
|
@ -458,10 +472,10 @@ public class JMSMappingInboundTransformerTest {
|
|||
ServerJMSTextMessage jmsMessage = (ServerJMSTextMessage)ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
jmsMessage.decode();
|
||||
|
||||
assertTrue("Expected TextMessage", jmsMessage instanceof ServerJMSTextMessage);
|
||||
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
|
||||
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
|
||||
|
||||
ServerJMSTextMessage textMessage = jmsMessage;
|
||||
TextMessage textMessage = (TextMessage) jmsMessage;
|
||||
|
||||
assertNotNull(textMessage.getText());
|
||||
assertEquals(contentString, textMessage.getText());
|
||||
|
@ -471,30 +485,30 @@ public class JMSMappingInboundTransformerTest {
|
|||
|
||||
@Test
|
||||
public void testTransformWithNoToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl(null);
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl(null, Destination.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithQueueStringToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl("queue");
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl("queue", Queue.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTemporaryQueueStringToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl("queue,temporary");
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl("queue,temporary", TemporaryQueue.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTopicStringToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl("topic");
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl("topic", Topic.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTemporaryTopicStringToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl("topic,temporary");
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl("topic,temporary", TemporaryTopic.class);
|
||||
}
|
||||
|
||||
private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue)
|
||||
private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue, Class<? extends Destination> expectedClass)
|
||||
throws Exception {
|
||||
|
||||
String toAddress = "toAddress";
|
||||
|
@ -508,39 +522,38 @@ public class JMSMappingInboundTransformerTest {
|
|||
message.setMessageAnnotations(ma);
|
||||
}
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
assertTrue("Expected ServerJMSTextMessage", jmsMessage instanceof ServerJMSTextMessage);
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
|
||||
}
|
||||
|
||||
// ----- ReplyTo Conversions ----------------------------------------------//
|
||||
|
||||
|
||||
@Test
|
||||
public void testTransformWithNoReplyToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(null);
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(null, Destination.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithQueueStringReplyToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("queue");
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("queue", Queue.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTemporaryQueueStringReplyToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("queue,temporary");
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("queue,temporary", TemporaryQueue.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTopicStringReplyToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("topic");
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("topic", Topic.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTemporaryTopicStringReplyToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("topic,temporary");
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("topic,temporary", TemporaryTopic.class);
|
||||
}
|
||||
|
||||
private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue)
|
||||
private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue, Class<? extends Destination> expectedClass)
|
||||
throws Exception {
|
||||
|
||||
String replyToAddress = "replyToAddress";
|
||||
|
@ -554,8 +567,8 @@ public class JMSMappingInboundTransformerTest {
|
|||
message.setMessageAnnotations(ma);
|
||||
}
|
||||
|
||||
ServerJMSMessage jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
assertTrue("Expected TextMessage", jmsMessage instanceof ServerJMSTextMessage);
|
||||
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
|
||||
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
|
||||
}
|
||||
|
||||
private AMQPStandardMessage encodeAndCreateAMQPMessage(MessageImpl message) {
|
||||
|
|
|
@ -40,9 +40,15 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryQueue;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
|
||||
|
@ -467,8 +473,7 @@ public class JMSMappingOutboundTransformerTest {
|
|||
doTestConvertMessageWithJMSDestination(createDestination(QUEUE_TYPE), QUEUE_TYPE);
|
||||
}
|
||||
|
||||
|
||||
private void doTestConvertMessageWithJMSDestination(String jmsDestination, Object expectedAnnotationValue) throws Exception {
|
||||
private void doTestConvertMessageWithJMSDestination(Destination jmsDestination, Object expectedAnnotationValue) throws Exception {
|
||||
ServerJMSTextMessage textMessage = createTextMessage();
|
||||
textMessage.setText("myTextMessageContent");
|
||||
textMessage.setJMSDestination(jmsDestination);
|
||||
|
@ -485,7 +490,7 @@ public class JMSMappingOutboundTransformerTest {
|
|||
}
|
||||
|
||||
if (jmsDestination != null) {
|
||||
assertEquals("Unexpected 'to' address", jmsDestination, amqp.getAddress());
|
||||
assertEquals("Unexpected 'to' address", AMQPMessageSupport.toAddress(jmsDestination), amqp.getAddress());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -501,7 +506,7 @@ public class JMSMappingOutboundTransformerTest {
|
|||
doTestConvertMessageWithJMSReplyTo(createDestination(QUEUE_TYPE), QUEUE_TYPE);
|
||||
}
|
||||
|
||||
private void doTestConvertMessageWithJMSReplyTo(String jmsReplyTo, Object expectedAnnotationValue) throws Exception {
|
||||
private void doTestConvertMessageWithJMSReplyTo(Destination jmsReplyTo, Object expectedAnnotationValue) throws Exception {
|
||||
ServerJMSTextMessage textMessage = createTextMessage();
|
||||
textMessage.setText("myTextMessageContent");
|
||||
textMessage.setJMSReplyTo(jmsReplyTo);
|
||||
|
@ -518,24 +523,34 @@ public class JMSMappingOutboundTransformerTest {
|
|||
}
|
||||
|
||||
if (jmsReplyTo != null) {
|
||||
assertEquals("Unexpected 'reply-to' address", jmsReplyTo, amqp.getReplyTo().toString());
|
||||
assertEquals("Unexpected 'reply-to' address", AMQPMessageSupport.toAddress(jmsReplyTo).toString(), amqp.getReplyTo().toString());
|
||||
}
|
||||
}
|
||||
|
||||
// ----- Utility Methods used for this Test -------------------------------//
|
||||
|
||||
private String createDestination(byte destType) {
|
||||
private Destination createDestination(byte destType) {
|
||||
Destination destination = null;
|
||||
String prefix = PrefixUtil.getURIPrefix(TEST_ADDRESS);
|
||||
String address = PrefixUtil.removePrefix(TEST_ADDRESS, prefix);
|
||||
switch (destType) {
|
||||
case QUEUE_TYPE:
|
||||
destination = new ActiveMQQueue(address);
|
||||
break;
|
||||
case TOPIC_TYPE:
|
||||
destination = new ActiveMQTopic(address);
|
||||
break;
|
||||
case TEMP_QUEUE_TYPE:
|
||||
destination = new ActiveMQTemporaryQueue(address, null);
|
||||
break;
|
||||
case TEMP_TOPIC_TYPE:
|
||||
return address;
|
||||
destination = new ActiveMQTemporaryTopic(address, null);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Invliad Destination Type given/");
|
||||
}
|
||||
|
||||
return destination;
|
||||
}
|
||||
|
||||
private ServerJMSMessage createMessage() {
|
||||
|
@ -582,7 +597,7 @@ public class JMSMappingOutboundTransformerTest {
|
|||
|
||||
try {
|
||||
result.setText(text);
|
||||
} catch (Exception e) {
|
||||
} catch (JMSException e) {
|
||||
}
|
||||
|
||||
return result;
|
||||
|
|
Loading…
Reference in New Issue