ARTEMIS-503 - replace proton-jms with proton-jms from ActiveMQ
Ive copied over the source itself https://issues.apache.org/jira/browse/ARTEMIS-503
This commit is contained in:
parent
7fb603f78f
commit
c161ab46a6
|
@ -186,10 +186,6 @@
|
|||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-codec-mqtt</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-amqp</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -80,7 +80,6 @@
|
|||
<include>org.jboss.logging:jboss-logging</include>
|
||||
<include>io.netty:netty-all</include>
|
||||
<include>org.apache.qpid:proton-j</include>
|
||||
<include>org.apache.activemq:activemq-amqp</include>
|
||||
<include>org.apache.activemq:activemq-client</include>
|
||||
<include>org.slf4j:slf4j-api</include>
|
||||
<include>io.airlift:airline</include>
|
||||
|
|
|
@ -41,10 +41,6 @@
|
|||
<artifactId>artemis-core-client</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-amqp</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jboss.logging</groupId>
|
||||
<artifactId>jboss-logging-processor</artifactId>
|
||||
|
|
|
@ -26,6 +26,7 @@ import javax.jms.TextMessage;
|
|||
|
||||
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerDestination;
|
||||
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSObjectMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.proton.converter.message.JMSVendor;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSBytesMessage;
|
||||
|
@ -36,7 +37,6 @@ import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMST
|
|||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
|
||||
import org.apache.activemq.artemis.utils.IDGenerator;
|
||||
import org.apache.activemq.transport.amqp.message.JMSVendor;
|
||||
|
||||
public class ActiveMQJMSVendor implements JMSVendor {
|
||||
|
||||
|
|
|
@ -1,49 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.proton.converter;
|
||||
|
||||
import org.apache.activemq.transport.amqp.message.JMSVendor;
|
||||
import org.apache.qpid.proton.amqp.UnsignedLong;
|
||||
import org.apache.qpid.proton.amqp.messaging.Properties;
|
||||
|
||||
import javax.jms.Message;
|
||||
|
||||
class JMSMappingInboundTransformer extends org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer {
|
||||
|
||||
JMSMappingInboundTransformer(JMSVendor vendor) {
|
||||
super(vendor);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
|
||||
super.populateMessage(jms, amqp);
|
||||
final Properties properties = amqp.getProperties();
|
||||
if (properties != null) {
|
||||
if (properties.getMessageId() != null) {
|
||||
if (properties.getMessageId() instanceof Long) {
|
||||
jms.setLongProperty(this.getPrefixVendor() + "NATIVE_LONG_MESSAGE_ID", (Long) properties.getMessageId());
|
||||
}
|
||||
else if (properties.getMessageId() instanceof UnsignedLong) {
|
||||
jms.setLongProperty(this.getPrefixVendor() + "NATIVE_UNSIGNED_LONG_MESSAGE_ID", ((UnsignedLong) properties.getMessageId()).longValue());
|
||||
}
|
||||
else {
|
||||
jms.setStringProperty(this.getPrefixVendor() + "NATIVE_STRING_MESSAGE_ID", properties.getMessageId().toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,53 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.proton.converter;
|
||||
|
||||
import org.apache.activemq.transport.amqp.message.JMSVendor;
|
||||
import org.apache.qpid.proton.amqp.UnsignedLong;
|
||||
import org.apache.qpid.proton.message.ProtonJMessage;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.Map;
|
||||
|
||||
class JMSMappingOutboundTransformer extends org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer {
|
||||
JMSMappingOutboundTransformer(JMSVendor vendor) {
|
||||
super(vendor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException {
|
||||
ProtonJMessage protonJMessage = super.convert(msg);
|
||||
|
||||
Map properties = protonJMessage.getApplicationProperties().getValue();
|
||||
|
||||
if (properties.containsKey(this.getPrefixVendor() + "NATIVE_LONG_MESSAGE_ID")) {
|
||||
Long id = (Long) properties.remove(this.getPrefixVendor() + "NATIVE_LONG_MESSAGE_ID");
|
||||
protonJMessage.setMessageId(id);
|
||||
}
|
||||
else if (properties.containsKey(this.getPrefixVendor() + "NATIVE_UNSIGNED_LONG_MESSAGE_ID")) {
|
||||
Long id = (Long) properties.remove(this.getPrefixVendor() + "NATIVE_UNSIGNED_LONG_MESSAGE_ID");
|
||||
protonJMessage.setMessageId(new UnsignedLong(id));
|
||||
}
|
||||
else if (properties.containsKey(this.getPrefixVendor() + "NATIVE_STRING_MESSAGE_ID")) {
|
||||
String id = (String) properties.remove(this.getPrefixVendor() + "NATIVE_STRING_MESSAGE_ID");
|
||||
protonJMessage.setMessageId(id);
|
||||
}
|
||||
return protonJMessage;
|
||||
}
|
||||
}
|
|
@ -17,9 +17,12 @@
|
|||
package org.apache.activemq.artemis.core.protocol.proton.converter;
|
||||
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||
import org.apache.activemq.transport.amqp.message.EncodedMessage;
|
||||
import org.apache.activemq.transport.amqp.message.InboundTransformer;
|
||||
import org.apache.activemq.artemis.core.protocol.proton.converter.message.AMQPNativeOutboundTransformer;
|
||||
import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.proton.converter.message.InboundTransformer;
|
||||
import org.apache.activemq.artemis.core.protocol.proton.converter.message.JMSMappingInboundTransformer;
|
||||
import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.proton.converter.message.JMSMappingOutboundTransformer;
|
||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
|
||||
import org.apache.activemq.artemis.utils.IDGenerator;
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.activemq.artemis.reader.MessageUtil;
|
|||
public class ServerJMSMessage implements Message {
|
||||
|
||||
protected final MessageInternal message;
|
||||
private final String NATIVE_MESSAGE_ID = "NATIVE_MESSAGE_ID";
|
||||
|
||||
protected int deliveryCount;
|
||||
|
||||
|
@ -65,11 +66,17 @@ public class ServerJMSMessage implements Message {
|
|||
|
||||
@Override
|
||||
public final String getJMSMessageID() throws JMSException {
|
||||
if (message.containsProperty(NATIVE_MESSAGE_ID)) {
|
||||
return getStringProperty(NATIVE_MESSAGE_ID);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void setJMSMessageID(String id) throws JMSException {
|
||||
if (id != null) {
|
||||
message.putStringProperty(NATIVE_MESSAGE_ID, id);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,11 +18,11 @@ package org.apache.activemq.artemis.core.protocol.proton.converter.jms;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
|
||||
import org.apache.activemq.util.ByteArrayOutputStream;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.ObjectMessage;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.Serializable;
|
||||
|
|
|
@ -0,0 +1,257 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.proton.converter.message;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.UnsignedLong;
|
||||
import org.proton.plug.exceptions.ActiveMQAMQPIllegalStateException;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Helper class for identifying and converting message-id and correlation-id values between
|
||||
* the AMQP types and the Strings values used by JMS.
|
||||
* <p>
|
||||
* <p>AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, message-id-binary,
|
||||
* message-id-uuid, or message-id-ulong. In order to accept or return a string representation of these
|
||||
* for interoperability with other AMQP clients, the following encoding can be used after removing or
|
||||
* before adding the "ID:" prefix used for a JMSMessageID value:<br>
|
||||
* <p>
|
||||
* {@literal "AMQP_BINARY:<hex representation of binary content>"}<br>
|
||||
* {@literal "AMQP_UUID:<string representation of uuid>"}<br>
|
||||
* {@literal "AMQP_ULONG:<string representation of ulong>"}<br>
|
||||
* {@literal "AMQP_STRING:<string>"}<br>
|
||||
* <p>
|
||||
* <p>The AMQP_STRING encoding exists only for escaping message-id-string values that happen to begin
|
||||
* with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used otherwise.
|
||||
* <p>
|
||||
* <p>When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or
|
||||
* ulong but can't be converted into the indicated format, an exception will be thrown.
|
||||
*/
|
||||
public class AMQPMessageIdHelper {
|
||||
|
||||
public static final AMQPMessageIdHelper INSTANCE = new AMQPMessageIdHelper();
|
||||
|
||||
public static final String AMQP_STRING_PREFIX = "AMQP_STRING:";
|
||||
public static final String AMQP_UUID_PREFIX = "AMQP_UUID:";
|
||||
public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:";
|
||||
public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:";
|
||||
|
||||
private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length();
|
||||
private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length();
|
||||
private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length();
|
||||
private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length();
|
||||
private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray();
|
||||
|
||||
/**
|
||||
* Takes the provided AMQP messageId style object, and convert it to a base string.
|
||||
* Encodes type information as a prefix where necessary to convey or escape the type
|
||||
* of the provided object.
|
||||
*
|
||||
* @param messageId the raw messageId object to process
|
||||
* @return the base string to be used in creating the actual id.
|
||||
*/
|
||||
public String toBaseMessageIdString(Object messageId) {
|
||||
if (messageId == null) {
|
||||
return null;
|
||||
}
|
||||
else if (messageId instanceof String) {
|
||||
String stringId = (String) messageId;
|
||||
|
||||
// If the given string has a type encoding prefix,
|
||||
// we need to escape it as an encoded string (even if
|
||||
// the existing encoding prefix was also for string)
|
||||
if (hasTypeEncodingPrefix(stringId)) {
|
||||
return AMQP_STRING_PREFIX + stringId;
|
||||
}
|
||||
else {
|
||||
return stringId;
|
||||
}
|
||||
}
|
||||
else if (messageId instanceof UUID) {
|
||||
return AMQP_UUID_PREFIX + messageId.toString();
|
||||
}
|
||||
else if (messageId instanceof UnsignedLong) {
|
||||
return AMQP_ULONG_PREFIX + messageId.toString();
|
||||
}
|
||||
else if (messageId instanceof Binary) {
|
||||
ByteBuffer dup = ((Binary) messageId).asByteBuffer();
|
||||
|
||||
byte[] bytes = new byte[dup.remaining()];
|
||||
dup.get(bytes);
|
||||
|
||||
String hex = convertBinaryToHexString(bytes);
|
||||
|
||||
return AMQP_BINARY_PREFIX + hex;
|
||||
}
|
||||
else {
|
||||
throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes the provided base id string and return the appropriate amqp messageId style object.
|
||||
* Converts the type based on any relevant encoding information found as a prefix.
|
||||
*
|
||||
* @param baseId the object to be converted to an AMQP MessageId value.
|
||||
* @return the AMQP messageId style object
|
||||
* @throws ActiveMQAMQPIllegalStateException if the provided baseId String indicates an encoded type but can't be converted to that type.
|
||||
*/
|
||||
public Object toIdObject(String baseId) throws ActiveMQAMQPIllegalStateException {
|
||||
if (baseId == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
if (hasAmqpUuidPrefix(baseId)) {
|
||||
String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH);
|
||||
return UUID.fromString(uuidString);
|
||||
}
|
||||
else if (hasAmqpUlongPrefix(baseId)) {
|
||||
String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH);
|
||||
return UnsignedLong.valueOf(longString);
|
||||
}
|
||||
else if (hasAmqpStringPrefix(baseId)) {
|
||||
return strip(baseId, AMQP_STRING_PREFIX_LENGTH);
|
||||
}
|
||||
else if (hasAmqpBinaryPrefix(baseId)) {
|
||||
String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH);
|
||||
byte[] bytes = convertHexStringToBinary(hexString);
|
||||
return new Binary(bytes);
|
||||
}
|
||||
else {
|
||||
// We have a string without any type prefix, transmit it as-is.
|
||||
return baseId;
|
||||
}
|
||||
}
|
||||
catch (IllegalArgumentException e) {
|
||||
throw new ActiveMQAMQPIllegalStateException("Unable to convert ID value");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the provided hex-string into a binary representation where each byte represents
|
||||
* two characters of the hex string.
|
||||
* <p>
|
||||
* The hex characters may be upper or lower case.
|
||||
*
|
||||
* @param hexString string to convert to a binary value.
|
||||
* @return a byte array containing the binary representation
|
||||
* @throws IllegalArgumentException if the provided String is a non-even length or contains
|
||||
* non-hex characters
|
||||
*/
|
||||
public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException {
|
||||
int length = hexString.length();
|
||||
|
||||
// As each byte needs two characters in the hex encoding, the string must be an even length.
|
||||
if (length % 2 != 0) {
|
||||
throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString);
|
||||
}
|
||||
|
||||
byte[] binary = new byte[length / 2];
|
||||
|
||||
for (int i = 0; i < length; i += 2) {
|
||||
char highBitsChar = hexString.charAt(i);
|
||||
char lowBitsChar = hexString.charAt(i + 1);
|
||||
|
||||
int highBits = hexCharToInt(highBitsChar, hexString) << 4;
|
||||
int lowBits = hexCharToInt(lowBitsChar, hexString);
|
||||
|
||||
binary[i / 2] = (byte) (highBits + lowBits);
|
||||
}
|
||||
|
||||
return binary;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the provided binary into a hex-string representation where each character
|
||||
* represents 4 bits of the provided binary, i.e each byte requires two characters.
|
||||
* <p>
|
||||
* The returned hex characters are upper-case.
|
||||
*
|
||||
* @param bytes the binary value to convert to a hex String instance.
|
||||
* @return a String containing a hex representation of the bytes
|
||||
*/
|
||||
public String convertBinaryToHexString(byte[] bytes) {
|
||||
// Each byte is represented as 2 chars
|
||||
StringBuilder builder = new StringBuilder(bytes.length * 2);
|
||||
|
||||
for (byte b : bytes) {
|
||||
// The byte will be expanded to int before shifting, replicating the
|
||||
// sign bit, so mask everything beyond the first 4 bits afterwards
|
||||
int highBitsInt = (b >> 4) & 0xF;
|
||||
// We only want the first 4 bits
|
||||
int lowBitsInt = b & 0xF;
|
||||
|
||||
builder.append(HEX_CHARS[highBitsInt]);
|
||||
builder.append(HEX_CHARS[lowBitsInt]);
|
||||
}
|
||||
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
//----- Internal implementation ------------------------------------------//
|
||||
|
||||
private boolean hasTypeEncodingPrefix(String stringId) {
|
||||
return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) ||
|
||||
hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId);
|
||||
}
|
||||
|
||||
private boolean hasAmqpStringPrefix(String stringId) {
|
||||
return stringId.startsWith(AMQP_STRING_PREFIX);
|
||||
}
|
||||
|
||||
private boolean hasAmqpUlongPrefix(String stringId) {
|
||||
return stringId.startsWith(AMQP_ULONG_PREFIX);
|
||||
}
|
||||
|
||||
private boolean hasAmqpUuidPrefix(String stringId) {
|
||||
return stringId.startsWith(AMQP_UUID_PREFIX);
|
||||
}
|
||||
|
||||
private boolean hasAmqpBinaryPrefix(String stringId) {
|
||||
return stringId.startsWith(AMQP_BINARY_PREFIX);
|
||||
}
|
||||
|
||||
private String strip(String id, int numChars) {
|
||||
return id.substring(numChars);
|
||||
}
|
||||
|
||||
private int hexCharToInt(char ch, String orig) throws IllegalArgumentException {
|
||||
if (ch >= '0' && ch <= '9') {
|
||||
// subtract '0' to get difference in position as an int
|
||||
return ch - '0';
|
||||
}
|
||||
else if (ch >= 'A' && ch <= 'F') {
|
||||
// subtract 'A' to get difference in position as an int
|
||||
// and then add 10 for the offset of 'A'
|
||||
return ch - 'A' + 10;
|
||||
}
|
||||
else if (ch >= 'a' && ch <= 'f') {
|
||||
// subtract 'a' to get difference in position as an int
|
||||
// and then add 10 for the offset of 'a'
|
||||
return ch - 'a' + 10;
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.proton.converter.message;
|
||||
|
||||
import javax.jms.Message;
|
||||
|
||||
public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
|
||||
|
||||
public AMQPNativeInboundTransformer(JMSVendor vendor) {
|
||||
super(vendor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTransformerName() {
|
||||
return TRANSFORMER_NATIVE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InboundTransformer getFallbackTransformer() {
|
||||
return new AMQPRawInboundTransformer(getVendor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message transform(EncodedMessage amqpMessage) throws Exception {
|
||||
org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
|
||||
|
||||
Message rc = super.transform(amqpMessage);
|
||||
|
||||
populateMessage(rc, amqp);
|
||||
return rc;
|
||||
}
|
||||
}
|
|
@ -1,10 +1,10 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
|
@ -14,9 +14,8 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.proton.converter;
|
||||
package org.apache.activemq.artemis.core.protocol.proton.converter.message;
|
||||
|
||||
import org.apache.activemq.transport.amqp.message.OutboundTransformer;
|
||||
import org.apache.qpid.proton.amqp.UnsignedInteger;
|
||||
import org.apache.qpid.proton.amqp.messaging.Header;
|
||||
import org.apache.qpid.proton.message.ProtonJMessage;
|
||||
|
@ -24,8 +23,13 @@ import org.apache.qpid.proton.message.ProtonJMessage;
|
|||
import javax.jms.BytesMessage;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
public class AMQPNativeOutboundTransformer {
|
||||
static ProtonJMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
|
||||
public class AMQPNativeOutboundTransformer extends OutboundTransformer {
|
||||
|
||||
public AMQPNativeOutboundTransformer(JMSVendor vendor) {
|
||||
super(vendor);
|
||||
}
|
||||
|
||||
public static ProtonJMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
|
||||
byte[] data = new byte[(int) msg.getBodyLength()];
|
||||
msg.readBytes(data);
|
||||
msg.reset();
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.proton.converter.message;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Message;
|
||||
|
||||
public class AMQPRawInboundTransformer extends InboundTransformer {
|
||||
|
||||
public AMQPRawInboundTransformer(JMSVendor vendor) {
|
||||
super(vendor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTransformerName() {
|
||||
return TRANSFORMER_RAW;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InboundTransformer getFallbackTransformer() {
|
||||
return null; // No fallback from full raw transform
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message transform(EncodedMessage amqpMessage) throws Exception {
|
||||
BytesMessage rc = vendor.createBytesMessage();
|
||||
rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
|
||||
|
||||
// We cannot decode the message headers to check so err on the side of caution
|
||||
// and mark all messages as persistent.
|
||||
rc.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
rc.setJMSPriority(defaultPriority);
|
||||
|
||||
final long now = System.currentTimeMillis();
|
||||
rc.setJMSTimestamp(now);
|
||||
if (defaultTtl > 0) {
|
||||
rc.setJMSExpiration(now + defaultTtl);
|
||||
}
|
||||
|
||||
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
|
||||
rc.setBooleanProperty(prefixVendor + "NATIVE", true);
|
||||
|
||||
return rc;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.proton.converter.message;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.message.Message;
|
||||
|
||||
public class EncodedMessage {
|
||||
|
||||
private final Binary data;
|
||||
final long messageFormat;
|
||||
|
||||
public EncodedMessage(long messageFormat, byte[] data, int offset, int length) {
|
||||
this.data = new Binary(data, offset, length);
|
||||
this.messageFormat = messageFormat;
|
||||
}
|
||||
|
||||
public long getMessageFormat() {
|
||||
return messageFormat;
|
||||
}
|
||||
|
||||
public Message decode() throws Exception {
|
||||
Message amqp = Message.Factory.create();
|
||||
|
||||
int offset = getArrayOffset();
|
||||
int len = getLength();
|
||||
while (len > 0) {
|
||||
final int decoded = amqp.decode(getArray(), offset, len);
|
||||
assert decoded > 0 : "Make progress decoding the message";
|
||||
offset += decoded;
|
||||
len -= decoded;
|
||||
}
|
||||
|
||||
return amqp;
|
||||
}
|
||||
|
||||
public int getLength() {
|
||||
return data.getLength();
|
||||
}
|
||||
|
||||
public int getArrayOffset() {
|
||||
return data.getArrayOffset();
|
||||
}
|
||||
|
||||
public byte[] getArray() {
|
||||
return data.getArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return data.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,317 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.proton.converter.message;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.Decimal128;
|
||||
import org.apache.qpid.proton.amqp.Decimal32;
|
||||
import org.apache.qpid.proton.amqp.Decimal64;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.UnsignedByte;
|
||||
import org.apache.qpid.proton.amqp.UnsignedInteger;
|
||||
import org.apache.qpid.proton.amqp.UnsignedLong;
|
||||
import org.apache.qpid.proton.amqp.UnsignedShort;
|
||||
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
|
||||
import org.apache.qpid.proton.amqp.messaging.Footer;
|
||||
import org.apache.qpid.proton.amqp.messaging.Header;
|
||||
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
|
||||
import org.apache.qpid.proton.amqp.messaging.Properties;
|
||||
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
|
||||
|
||||
public abstract class InboundTransformer {
|
||||
|
||||
JMSVendor vendor;
|
||||
|
||||
public static final String TRANSFORMER_NATIVE = "native";
|
||||
public static final String TRANSFORMER_RAW = "raw";
|
||||
public static final String TRANSFORMER_JMS = "jms";
|
||||
|
||||
String prefixVendor = "JMS_AMQP_";
|
||||
String prefixDeliveryAnnotations = "DA_";
|
||||
String prefixMessageAnnotations = "MA_";
|
||||
String prefixFooter = "FT_";
|
||||
|
||||
int defaultDeliveryMode = DeliveryMode.NON_PERSISTENT;
|
||||
int defaultPriority = Message.DEFAULT_PRIORITY;
|
||||
long defaultTtl = Message.DEFAULT_TIME_TO_LIVE;
|
||||
|
||||
public InboundTransformer(JMSVendor vendor) {
|
||||
this.vendor = vendor;
|
||||
}
|
||||
|
||||
public abstract Message transform(EncodedMessage amqpMessage) throws Exception;
|
||||
|
||||
public abstract String getTransformerName();
|
||||
|
||||
public abstract InboundTransformer getFallbackTransformer();
|
||||
|
||||
public int getDefaultDeliveryMode() {
|
||||
return defaultDeliveryMode;
|
||||
}
|
||||
|
||||
public void setDefaultDeliveryMode(int defaultDeliveryMode) {
|
||||
this.defaultDeliveryMode = defaultDeliveryMode;
|
||||
}
|
||||
|
||||
public int getDefaultPriority() {
|
||||
return defaultPriority;
|
||||
}
|
||||
|
||||
public void setDefaultPriority(int defaultPriority) {
|
||||
this.defaultPriority = defaultPriority;
|
||||
}
|
||||
|
||||
public long getDefaultTtl() {
|
||||
return defaultTtl;
|
||||
}
|
||||
|
||||
public void setDefaultTtl(long defaultTtl) {
|
||||
this.defaultTtl = defaultTtl;
|
||||
}
|
||||
|
||||
public String getPrefixVendor() {
|
||||
return prefixVendor;
|
||||
}
|
||||
|
||||
public void setPrefixVendor(String prefixVendor) {
|
||||
this.prefixVendor = prefixVendor;
|
||||
}
|
||||
|
||||
public JMSVendor getVendor() {
|
||||
return vendor;
|
||||
}
|
||||
|
||||
public void setVendor(JMSVendor vendor) {
|
||||
this.vendor = vendor;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
|
||||
Header header = amqp.getHeader();
|
||||
if (header == null) {
|
||||
header = new Header();
|
||||
}
|
||||
|
||||
if (header.getDurable() != null) {
|
||||
jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
|
||||
}
|
||||
else {
|
||||
jms.setJMSDeliveryMode(defaultDeliveryMode);
|
||||
}
|
||||
|
||||
if (header.getPriority() != null) {
|
||||
jms.setJMSPriority(header.getPriority().intValue());
|
||||
}
|
||||
else {
|
||||
jms.setJMSPriority(defaultPriority);
|
||||
}
|
||||
|
||||
if (header.getFirstAcquirer() != null) {
|
||||
jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
|
||||
}
|
||||
|
||||
if (header.getDeliveryCount() != null) {
|
||||
vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
|
||||
}
|
||||
|
||||
final MessageAnnotations ma = amqp.getMessageAnnotations();
|
||||
if (ma != null) {
|
||||
for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
|
||||
String key = entry.getKey().toString();
|
||||
if ("x-opt-jms-type".equals(key) && entry.getValue() != null) {
|
||||
// Legacy annotation, JMSType value will be replaced by Subject further down if also present.
|
||||
jms.setJMSType(entry.getValue().toString());
|
||||
}
|
||||
else if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
|
||||
long deliveryTime = ((Number) entry.getValue()).longValue();
|
||||
jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), deliveryTime);
|
||||
}
|
||||
else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
|
||||
long delay = ((Number) entry.getValue()).longValue();
|
||||
if (delay > 0) {
|
||||
jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay);
|
||||
}
|
||||
}
|
||||
//todo
|
||||
/*else if ("x-opt-delivery-repeat".equals(key) && entry.getValue() != null) {
|
||||
int repeat = ((Number) entry.getValue()).intValue();
|
||||
if (repeat > 0) {
|
||||
jms.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
|
||||
}
|
||||
} else if ("x-opt-delivery-period".equals(key) && entry.getValue() != null) {
|
||||
long period = ((Number) entry.getValue()).longValue();
|
||||
if (period > 0) {
|
||||
jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
|
||||
}
|
||||
} else if ("x-opt-delivery-cron".equals(key) && entry.getValue() != null) {
|
||||
String cronEntry = (String) entry.getValue();
|
||||
if (cronEntry != null) {
|
||||
jms.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cronEntry);
|
||||
}
|
||||
}*/
|
||||
|
||||
setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
final ApplicationProperties ap = amqp.getApplicationProperties();
|
||||
if (ap != null) {
|
||||
for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) {
|
||||
String key = entry.getKey().toString();
|
||||
if ("JMSXGroupID".equals(key)) {
|
||||
vendor.setJMSXGroupID(jms, entry.getValue().toString());
|
||||
}
|
||||
else if ("JMSXGroupSequence".equals(key)) {
|
||||
vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue());
|
||||
}
|
||||
else if ("JMSXUserID".equals(key)) {
|
||||
vendor.setJMSXUserID(jms, entry.getValue().toString());
|
||||
}
|
||||
else {
|
||||
setProperty(jms, key, entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final Properties properties = amqp.getProperties();
|
||||
if (properties != null) {
|
||||
if (properties.getMessageId() != null) {
|
||||
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId()));
|
||||
}
|
||||
Binary userId = properties.getUserId();
|
||||
if (userId != null) {
|
||||
vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
|
||||
}
|
||||
if (properties.getTo() != null) {
|
||||
jms.setJMSDestination(vendor.createDestination(properties.getTo()));
|
||||
}
|
||||
if (properties.getSubject() != null) {
|
||||
jms.setJMSType(properties.getSubject());
|
||||
}
|
||||
if (properties.getReplyTo() != null) {
|
||||
jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
|
||||
}
|
||||
if (properties.getCorrelationId() != null) {
|
||||
jms.setJMSCorrelationID(properties.getCorrelationId().toString());
|
||||
}
|
||||
if (properties.getContentType() != null) {
|
||||
jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
|
||||
}
|
||||
if (properties.getContentEncoding() != null) {
|
||||
jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
|
||||
}
|
||||
if (properties.getCreationTime() != null) {
|
||||
jms.setJMSTimestamp(properties.getCreationTime().getTime());
|
||||
}
|
||||
if (properties.getGroupId() != null) {
|
||||
vendor.setJMSXGroupID(jms, properties.getGroupId());
|
||||
}
|
||||
if (properties.getGroupSequence() != null) {
|
||||
vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue());
|
||||
}
|
||||
if (properties.getReplyToGroupId() != null) {
|
||||
jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
|
||||
}
|
||||
if (properties.getAbsoluteExpiryTime() != null) {
|
||||
jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
|
||||
}
|
||||
}
|
||||
|
||||
// If the jms expiration has not yet been set...
|
||||
if (jms.getJMSExpiration() == 0) {
|
||||
// Then lets try to set it based on the message ttl.
|
||||
long ttl = defaultTtl;
|
||||
if (header.getTtl() != null) {
|
||||
ttl = header.getTtl().longValue();
|
||||
}
|
||||
|
||||
if (ttl == 0) {
|
||||
jms.setJMSExpiration(0);
|
||||
}
|
||||
else {
|
||||
jms.setJMSExpiration(System.currentTimeMillis() + ttl);
|
||||
}
|
||||
}
|
||||
|
||||
final Footer fp = amqp.getFooter();
|
||||
if (fp != null) {
|
||||
for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
|
||||
String key = entry.getKey().toString();
|
||||
setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setProperty(Message msg, String key, Object value) throws JMSException {
|
||||
if (value instanceof UnsignedLong) {
|
||||
long v = ((UnsignedLong) value).longValue();
|
||||
msg.setLongProperty(key, v);
|
||||
}
|
||||
else if (value instanceof UnsignedInteger) {
|
||||
long v = ((UnsignedInteger) value).longValue();
|
||||
if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) {
|
||||
msg.setIntProperty(key, (int) v);
|
||||
}
|
||||
else {
|
||||
msg.setLongProperty(key, v);
|
||||
}
|
||||
}
|
||||
else if (value instanceof UnsignedShort) {
|
||||
int v = ((UnsignedShort) value).intValue();
|
||||
if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) {
|
||||
msg.setShortProperty(key, (short) v);
|
||||
}
|
||||
else {
|
||||
msg.setIntProperty(key, v);
|
||||
}
|
||||
}
|
||||
else if (value instanceof UnsignedByte) {
|
||||
short v = ((UnsignedByte) value).shortValue();
|
||||
if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) {
|
||||
msg.setByteProperty(key, (byte) v);
|
||||
}
|
||||
else {
|
||||
msg.setShortProperty(key, v);
|
||||
}
|
||||
}
|
||||
else if (value instanceof Symbol) {
|
||||
msg.setStringProperty(key, value.toString());
|
||||
}
|
||||
else if (value instanceof Decimal128) {
|
||||
msg.setDoubleProperty(key, ((Decimal128) value).doubleValue());
|
||||
}
|
||||
else if (value instanceof Decimal64) {
|
||||
msg.setDoubleProperty(key, ((Decimal64) value).doubleValue());
|
||||
}
|
||||
else if (value instanceof Decimal32) {
|
||||
msg.setFloatProperty(key, ((Decimal32) value).floatValue());
|
||||
}
|
||||
else if (value instanceof Binary) {
|
||||
msg.setStringProperty(key, value.toString());
|
||||
}
|
||||
else {
|
||||
msg.setObjectProperty(key, value);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.proton.converter.message;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
|
||||
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
||||
import org.apache.qpid.proton.amqp.messaging.Data;
|
||||
import org.apache.qpid.proton.amqp.messaging.Section;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.StreamMessage;
|
||||
import javax.jms.TextMessage;
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class JMSMappingInboundTransformer extends InboundTransformer {
|
||||
|
||||
public JMSMappingInboundTransformer(JMSVendor vendor) {
|
||||
super(vendor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTransformerName() {
|
||||
return TRANSFORMER_JMS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InboundTransformer getFallbackTransformer() {
|
||||
return new AMQPNativeInboundTransformer(getVendor());
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Override
|
||||
public Message transform(EncodedMessage amqpMessage) throws Exception {
|
||||
org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
|
||||
|
||||
Message rc;
|
||||
final Section body = amqp.getBody();
|
||||
if (body == null) {
|
||||
rc = vendor.createMessage();
|
||||
}
|
||||
else if (body instanceof Data) {
|
||||
Binary d = ((Data) body).getValue();
|
||||
BytesMessage m = vendor.createBytesMessage();
|
||||
m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
|
||||
rc = m;
|
||||
}
|
||||
else if (body instanceof AmqpSequence) {
|
||||
AmqpSequence sequence = (AmqpSequence) body;
|
||||
StreamMessage m = vendor.createStreamMessage();
|
||||
for (Object item : sequence.getValue()) {
|
||||
m.writeObject(item);
|
||||
}
|
||||
rc = m;
|
||||
}
|
||||
else if (body instanceof AmqpValue) {
|
||||
Object value = ((AmqpValue) body).getValue();
|
||||
if (value == null) {
|
||||
rc = vendor.createObjectMessage();
|
||||
}
|
||||
if (value instanceof String) {
|
||||
TextMessage m = vendor.createTextMessage();
|
||||
m.setText((String) value);
|
||||
rc = m;
|
||||
}
|
||||
else if (value instanceof Binary) {
|
||||
Binary d = (Binary) value;
|
||||
BytesMessage m = vendor.createBytesMessage();
|
||||
m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
|
||||
rc = m;
|
||||
}
|
||||
else if (value instanceof List) {
|
||||
StreamMessage m = vendor.createStreamMessage();
|
||||
for (Object item : (List<Object>) value) {
|
||||
m.writeObject(item);
|
||||
}
|
||||
rc = m;
|
||||
}
|
||||
else if (value instanceof Map) {
|
||||
MapMessage m = vendor.createMapMessage();
|
||||
final Set<Map.Entry<String, Object>> set = ((Map<String, Object>) value).entrySet();
|
||||
for (Map.Entry<String, Object> entry : set) {
|
||||
m.setObject(entry.getKey(), entry.getValue());
|
||||
}
|
||||
rc = m;
|
||||
}
|
||||
else {
|
||||
ObjectMessage m = vendor.createObjectMessage();
|
||||
m.setObject((Serializable) value);
|
||||
rc = m;
|
||||
}
|
||||
}
|
||||
else {
|
||||
throw new RuntimeException("Unexpected body type: " + body.getClass());
|
||||
}
|
||||
rc.setJMSDeliveryMode(defaultDeliveryMode);
|
||||
rc.setJMSPriority(defaultPriority);
|
||||
rc.setJMSExpiration(defaultTtl);
|
||||
|
||||
populateMessage(rc, amqp);
|
||||
|
||||
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
|
||||
rc.setBooleanProperty(prefixVendor + "NATIVE", false);
|
||||
return rc;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,329 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.proton.converter.message;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.UnsignedByte;
|
||||
import org.apache.qpid.proton.amqp.UnsignedInteger;
|
||||
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
|
||||
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
||||
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
|
||||
import org.apache.qpid.proton.amqp.messaging.Data;
|
||||
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
|
||||
import org.apache.qpid.proton.amqp.messaging.Footer;
|
||||
import org.apache.qpid.proton.amqp.messaging.Header;
|
||||
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
|
||||
import org.apache.qpid.proton.amqp.messaging.Properties;
|
||||
import org.apache.qpid.proton.amqp.messaging.Section;
|
||||
import org.apache.qpid.proton.message.ProtonJMessage;
|
||||
import org.proton.plug.exceptions.ActiveMQAMQPIllegalStateException;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageEOFException;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.StreamMessage;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
|
||||
public class JMSMappingOutboundTransformer extends OutboundTransformer {
|
||||
|
||||
public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest");
|
||||
public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to");
|
||||
|
||||
public static final byte QUEUE_TYPE = 0x00;
|
||||
public static final byte TOPIC_TYPE = 0x01;
|
||||
public static final byte TEMP_QUEUE_TYPE = 0x02;
|
||||
public static final byte TEMP_TOPIC_TYPE = 0x03;
|
||||
|
||||
// Deprecated legacy values used by old QPid AMQP 1.0 JMS client.
|
||||
|
||||
public static final Symbol LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-to-type");
|
||||
public static final Symbol LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-reply-type");
|
||||
|
||||
public static final String LEGACY_QUEUE_TYPE = "queue";
|
||||
public static final String LEGACY_TOPIC_TYPE = "topic";
|
||||
public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue";
|
||||
public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic";
|
||||
|
||||
public JMSMappingOutboundTransformer(JMSVendor vendor) {
|
||||
super(vendor);
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform the conversion between JMS Message and Proton Message without
|
||||
* re-encoding it to array. This is needed because some frameworks may elect
|
||||
* to do this on their own way (Netty for instance using Nettybuffers)
|
||||
*
|
||||
* @param msg
|
||||
* @return
|
||||
* @throws Exception
|
||||
*/
|
||||
public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException {
|
||||
Header header = new Header();
|
||||
Properties props = new Properties();
|
||||
HashMap<Symbol, Object> daMap = null;
|
||||
HashMap<Symbol, Object> maMap = null;
|
||||
HashMap apMap = null;
|
||||
Section body = null;
|
||||
HashMap footerMap = null;
|
||||
if (msg instanceof BytesMessage) {
|
||||
BytesMessage m = (BytesMessage) msg;
|
||||
byte[] data = new byte[(int) m.getBodyLength()];
|
||||
m.readBytes(data);
|
||||
m.reset(); // Need to reset after readBytes or future readBytes
|
||||
// calls (ex: redeliveries) will fail and return -1
|
||||
body = new Data(new Binary(data));
|
||||
}
|
||||
if (msg instanceof TextMessage) {
|
||||
body = new AmqpValue(((TextMessage) msg).getText());
|
||||
}
|
||||
if (msg instanceof MapMessage) {
|
||||
final HashMap<String, Object> map = new HashMap<String, Object>();
|
||||
final MapMessage m = (MapMessage) msg;
|
||||
final Enumeration<String> names = m.getMapNames();
|
||||
while (names.hasMoreElements()) {
|
||||
String key = names.nextElement();
|
||||
map.put(key, m.getObject(key));
|
||||
}
|
||||
body = new AmqpValue(map);
|
||||
}
|
||||
if (msg instanceof StreamMessage) {
|
||||
ArrayList<Object> list = new ArrayList<Object>();
|
||||
final StreamMessage m = (StreamMessage) msg;
|
||||
try {
|
||||
while (true) {
|
||||
list.add(m.readObject());
|
||||
}
|
||||
}
|
||||
catch (MessageEOFException e) {
|
||||
}
|
||||
body = new AmqpSequence(list);
|
||||
}
|
||||
if (msg instanceof ObjectMessage) {
|
||||
body = new AmqpValue(((ObjectMessage) msg).getObject());
|
||||
}
|
||||
|
||||
header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
|
||||
header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
|
||||
if (msg.getJMSType() != null) {
|
||||
props.setSubject(msg.getJMSType());
|
||||
}
|
||||
if (msg.getJMSMessageID() != null) {
|
||||
|
||||
String msgId = msg.getJMSMessageID();
|
||||
|
||||
if (msgId != null) {
|
||||
try {
|
||||
props.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(msgId));
|
||||
}
|
||||
catch (ActiveMQAMQPIllegalStateException e) {
|
||||
props.setMessageId(msgId);
|
||||
}
|
||||
}
|
||||
else {
|
||||
props.setMessageId(msgId.toString());
|
||||
}
|
||||
}
|
||||
if (msg.getJMSDestination() != null) {
|
||||
props.setTo(vendor.toAddress(msg.getJMSDestination()));
|
||||
if (maMap == null) {
|
||||
maMap = new HashMap<Symbol, Object>();
|
||||
}
|
||||
maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSDestination()));
|
||||
|
||||
// Deprecated: used by legacy QPid AMQP 1.0 JMS client
|
||||
maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSDestination()));
|
||||
}
|
||||
if (msg.getJMSReplyTo() != null) {
|
||||
props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo()));
|
||||
if (maMap == null) {
|
||||
maMap = new HashMap<Symbol, Object>();
|
||||
}
|
||||
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSReplyTo()));
|
||||
|
||||
// Deprecated: used by legacy QPid AMQP 1.0 JMS client
|
||||
maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSReplyTo()));
|
||||
}
|
||||
if (msg.getJMSCorrelationID() != null) {
|
||||
props.setCorrelationId(msg.getJMSCorrelationID());
|
||||
}
|
||||
if (msg.getJMSExpiration() != 0) {
|
||||
long ttl = msg.getJMSExpiration() - System.currentTimeMillis();
|
||||
if (ttl < 0) {
|
||||
ttl = 1;
|
||||
}
|
||||
header.setTtl(new UnsignedInteger((int) ttl));
|
||||
|
||||
props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
|
||||
}
|
||||
if (msg.getJMSTimestamp() != 0) {
|
||||
props.setCreationTime(new Date(msg.getJMSTimestamp()));
|
||||
}
|
||||
|
||||
final Enumeration<String> keys = msg.getPropertyNames();
|
||||
while (keys.hasMoreElements()) {
|
||||
String key = keys.nextElement();
|
||||
if (key.equals(messageFormatKey) || key.equals(nativeKey)) {
|
||||
// skip..
|
||||
}
|
||||
else if (key.equals(firstAcquirerKey)) {
|
||||
header.setFirstAcquirer(msg.getBooleanProperty(key));
|
||||
}
|
||||
else if (key.startsWith("JMSXDeliveryCount")) {
|
||||
// The AMQP delivery-count field only includes prior failed delivery attempts,
|
||||
// whereas JMSXDeliveryCount includes the first/current delivery attempt.
|
||||
int amqpDeliveryCount = msg.getIntProperty(key) - 1;
|
||||
if (amqpDeliveryCount > 0) {
|
||||
header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
|
||||
}
|
||||
}
|
||||
else if (key.startsWith("JMSXUserID")) {
|
||||
String value = msg.getStringProperty(key);
|
||||
props.setUserId(new Binary(value.getBytes("UTF-8")));
|
||||
}
|
||||
else if (key.startsWith("JMSXGroupID")) {
|
||||
String value = msg.getStringProperty(key);
|
||||
props.setGroupId(value);
|
||||
if (apMap == null) {
|
||||
apMap = new HashMap();
|
||||
}
|
||||
apMap.put(key, value);
|
||||
}
|
||||
else if (key.startsWith("JMSXGroupSeq")) {
|
||||
UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key));
|
||||
props.setGroupSequence(value);
|
||||
if (apMap == null) {
|
||||
apMap = new HashMap();
|
||||
}
|
||||
apMap.put(key, value);
|
||||
}
|
||||
else if (key.startsWith(prefixDeliveryAnnotationsKey)) {
|
||||
if (daMap == null) {
|
||||
daMap = new HashMap<Symbol, Object>();
|
||||
}
|
||||
String name = key.substring(prefixDeliveryAnnotationsKey.length());
|
||||
daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
|
||||
}
|
||||
else if (key.startsWith(prefixMessageAnnotationsKey)) {
|
||||
if (maMap == null) {
|
||||
maMap = new HashMap<Symbol, Object>();
|
||||
}
|
||||
String name = key.substring(prefixMessageAnnotationsKey.length());
|
||||
maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
|
||||
}
|
||||
else if (key.equals(contentTypeKey)) {
|
||||
props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
|
||||
}
|
||||
else if (key.equals(contentEncodingKey)) {
|
||||
props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
|
||||
}
|
||||
else if (key.equals(replyToGroupIDKey)) {
|
||||
props.setReplyToGroupId(msg.getStringProperty(key));
|
||||
}
|
||||
else if (key.startsWith(prefixFooterKey)) {
|
||||
if (footerMap == null) {
|
||||
footerMap = new HashMap();
|
||||
}
|
||||
String name = key.substring(prefixFooterKey.length());
|
||||
footerMap.put(name, msg.getObjectProperty(key));
|
||||
}
|
||||
else {
|
||||
if (apMap == null) {
|
||||
apMap = new HashMap();
|
||||
}
|
||||
apMap.put(key, msg.getObjectProperty(key));
|
||||
}
|
||||
}
|
||||
|
||||
MessageAnnotations ma = null;
|
||||
if (maMap != null) {
|
||||
ma = new MessageAnnotations(maMap);
|
||||
}
|
||||
DeliveryAnnotations da = null;
|
||||
if (daMap != null) {
|
||||
da = new DeliveryAnnotations(daMap);
|
||||
}
|
||||
ApplicationProperties ap = null;
|
||||
if (apMap != null) {
|
||||
ap = new ApplicationProperties(apMap);
|
||||
}
|
||||
Footer footer = null;
|
||||
if (footerMap != null) {
|
||||
footer = new Footer(footerMap);
|
||||
}
|
||||
|
||||
return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
|
||||
}
|
||||
|
||||
private static byte destinationType(Destination destination) {
|
||||
if (destination instanceof Queue) {
|
||||
if (destination instanceof TemporaryQueue) {
|
||||
return TEMP_QUEUE_TYPE;
|
||||
}
|
||||
else {
|
||||
return QUEUE_TYPE;
|
||||
}
|
||||
}
|
||||
else if (destination instanceof Topic) {
|
||||
if (destination instanceof TemporaryTopic) {
|
||||
return TEMP_TOPIC_TYPE;
|
||||
}
|
||||
else {
|
||||
return TOPIC_TYPE;
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
|
||||
}
|
||||
|
||||
// Used by legacy QPid AMQP 1.0 JMS client.
|
||||
@Deprecated
|
||||
private static String destinationAttributes(Destination destination) {
|
||||
if (destination instanceof Queue) {
|
||||
if (destination instanceof TemporaryQueue) {
|
||||
return LEGACY_TEMP_QUEUE_TYPE;
|
||||
}
|
||||
else {
|
||||
return LEGACY_QUEUE_TYPE;
|
||||
}
|
||||
}
|
||||
else if (destination instanceof Topic) {
|
||||
if (destination instanceof TemporaryTopic) {
|
||||
return LEGACY_TEMP_TOPIC_TYPE;
|
||||
}
|
||||
else {
|
||||
return LEGACY_TOPIC_TYPE;
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.proton.converter.message;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.StreamMessage;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
public interface JMSVendor {
|
||||
|
||||
BytesMessage createBytesMessage();
|
||||
|
||||
StreamMessage createStreamMessage();
|
||||
|
||||
Message createMessage();
|
||||
|
||||
TextMessage createTextMessage();
|
||||
|
||||
ObjectMessage createObjectMessage();
|
||||
|
||||
MapMessage createMapMessage();
|
||||
|
||||
void setJMSXUserID(Message message, String value);
|
||||
|
||||
Destination createDestination(String name);
|
||||
|
||||
void setJMSXGroupID(Message message, String groupId);
|
||||
|
||||
void setJMSXGroupSequence(Message message, int value);
|
||||
|
||||
void setJMSXDeliveryCount(Message message, long value);
|
||||
|
||||
String toAddress(Destination destination);
|
||||
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.proton.converter.message;
|
||||
|
||||
public abstract class OutboundTransformer {
|
||||
|
||||
JMSVendor vendor;
|
||||
String prefixVendor;
|
||||
|
||||
String prefixDeliveryAnnotations = "DA_";
|
||||
String prefixMessageAnnotations = "MA_";
|
||||
String prefixFooter = "FT_";
|
||||
|
||||
String messageFormatKey;
|
||||
String nativeKey;
|
||||
String firstAcquirerKey;
|
||||
String prefixDeliveryAnnotationsKey;
|
||||
String prefixMessageAnnotationsKey;
|
||||
String contentTypeKey;
|
||||
String contentEncodingKey;
|
||||
String replyToGroupIDKey;
|
||||
String prefixFooterKey;
|
||||
|
||||
public OutboundTransformer(JMSVendor vendor) {
|
||||
this.vendor = vendor;
|
||||
this.setPrefixVendor("JMS_AMQP_");
|
||||
}
|
||||
|
||||
public String getPrefixVendor() {
|
||||
return prefixVendor;
|
||||
}
|
||||
|
||||
public void setPrefixVendor(String prefixVendor) {
|
||||
this.prefixVendor = prefixVendor;
|
||||
|
||||
messageFormatKey = prefixVendor + "MESSAGE_FORMAT";
|
||||
nativeKey = prefixVendor + "NATIVE";
|
||||
firstAcquirerKey = prefixVendor + "FirstAcquirer";
|
||||
prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations;
|
||||
prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
|
||||
contentTypeKey = prefixVendor + "ContentType";
|
||||
contentEncodingKey = prefixVendor + "ContentEncoding";
|
||||
replyToGroupIDKey = prefixVendor + "ReplyToGroupID";
|
||||
prefixFooterKey = prefixVendor + prefixFooter;
|
||||
|
||||
}
|
||||
|
||||
public JMSVendor getVendor() {
|
||||
return vendor;
|
||||
}
|
||||
|
||||
public void setVendor(JMSVendor vendor) {
|
||||
this.vendor = vendor;
|
||||
}
|
||||
}
|
|
@ -20,11 +20,11 @@ import java.util.concurrent.Executor;
|
|||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||
import org.apache.activemq.transport.amqp.message.EncodedMessage;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.transport.AmqpError;
|
||||
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||
|
|
|
@ -26,7 +26,7 @@ import java.util.Map;
|
|||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.transport.amqp.message.EncodedMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
|
||||
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
||||
|
|
12
pom.xml
12
pom.xml
|
@ -413,18 +413,6 @@
|
|||
<version>${activemq5-version}</version>
|
||||
<!-- License: Apache 2.0 -->
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>activemq-amqp</artifactId>
|
||||
<version>${activemq5-version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-jms_1.1_spec</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
<!-- License: Apache 2.0 -->
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
|
|
Loading…
Reference in New Issue