mirror of https://github.com/apache/activemq.git
Encode the incoming messageId value into a string using type prefixes and decode them on the way out to ensure that we preserve the original AMQP MessageId type and value.
This commit is contained in:
parent
8031d77f98
commit
4d6f4d7475
|
@ -0,0 +1,255 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.message;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.activemq.transport.amqp.AmqpProtocolException;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.UnsignedLong;
|
||||
|
||||
/**
|
||||
* Helper class for identifying and converting message-id and correlation-id values between
|
||||
* the AMQP types and the Strings values used by JMS.
|
||||
*
|
||||
* <p>AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, message-id-binary,
|
||||
* message-id-uuid, or message-id-ulong. In order to accept or return a string representation of these
|
||||
* for interoperability with other AMQP clients, the following encoding can be used after removing or
|
||||
* before adding the "ID:" prefix used for a JMSMessageID value:<br>
|
||||
*
|
||||
* {@literal "AMQP_BINARY:<hex representation of binary content>"}<br>
|
||||
* {@literal "AMQP_UUID:<string representation of uuid>"}<br>
|
||||
* {@literal "AMQP_ULONG:<string representation of ulong>"}<br>
|
||||
* {@literal "AMQP_STRING:<string>"}<br>
|
||||
*
|
||||
* <p>The AMQP_STRING encoding exists only for escaping message-id-string values that happen to begin
|
||||
* with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used otherwise.
|
||||
*
|
||||
* <p>When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or
|
||||
* ulong but can't be converted into the indicated format, an exception will be thrown.
|
||||
*
|
||||
*/
|
||||
public class AMQPMessageIdHelper {
|
||||
|
||||
public static final AMQPMessageIdHelper INSTANCE = new AMQPMessageIdHelper();
|
||||
|
||||
public static final String AMQP_STRING_PREFIX = "AMQP_STRING:";
|
||||
public static final String AMQP_UUID_PREFIX = "AMQP_UUID:";
|
||||
public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:";
|
||||
public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:";
|
||||
|
||||
private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length();
|
||||
private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length();
|
||||
private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length();
|
||||
private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length();
|
||||
private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray();
|
||||
|
||||
/**
|
||||
* Takes the provided AMQP messageId style object, and convert it to a base string.
|
||||
* Encodes type information as a prefix where necessary to convey or escape the type
|
||||
* of the provided object.
|
||||
*
|
||||
* @param messageId
|
||||
* the raw messageId object to process
|
||||
*
|
||||
* @return the base string to be used in creating the actual id.
|
||||
*/
|
||||
public String toBaseMessageIdString(Object messageId) {
|
||||
if (messageId == null) {
|
||||
return null;
|
||||
} else if (messageId instanceof String) {
|
||||
String stringId = (String) messageId;
|
||||
|
||||
// If the given string has a type encoding prefix,
|
||||
// we need to escape it as an encoded string (even if
|
||||
// the existing encoding prefix was also for string)
|
||||
if (hasTypeEncodingPrefix(stringId)) {
|
||||
return AMQP_STRING_PREFIX + stringId;
|
||||
} else {
|
||||
return stringId;
|
||||
}
|
||||
} else if (messageId instanceof UUID) {
|
||||
return AMQP_UUID_PREFIX + messageId.toString();
|
||||
} else if (messageId instanceof UnsignedLong) {
|
||||
return AMQP_ULONG_PREFIX + messageId.toString();
|
||||
} else if (messageId instanceof Binary) {
|
||||
ByteBuffer dup = ((Binary) messageId).asByteBuffer();
|
||||
|
||||
byte[] bytes = new byte[dup.remaining()];
|
||||
dup.get(bytes);
|
||||
|
||||
String hex = convertBinaryToHexString(bytes);
|
||||
|
||||
return AMQP_BINARY_PREFIX + hex;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes the provided base id string and return the appropriate amqp messageId style object.
|
||||
* Converts the type based on any relevant encoding information found as a prefix.
|
||||
*
|
||||
* @param baseId
|
||||
* the object to be converted to an AMQP MessageId value.
|
||||
*
|
||||
* @return the AMQP messageId style object
|
||||
*
|
||||
* @throws AmqpProtocolException if the provided baseId String indicates an encoded type but can't be converted to that type.
|
||||
*/
|
||||
public Object toIdObject(String baseId) throws AmqpProtocolException {
|
||||
if (baseId == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
if (hasAmqpUuidPrefix(baseId)) {
|
||||
String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH);
|
||||
return UUID.fromString(uuidString);
|
||||
} else if (hasAmqpUlongPrefix(baseId)) {
|
||||
String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH);
|
||||
return UnsignedLong.valueOf(longString);
|
||||
} else if (hasAmqpStringPrefix(baseId)) {
|
||||
return strip(baseId, AMQP_STRING_PREFIX_LENGTH);
|
||||
} else if (hasAmqpBinaryPrefix(baseId)) {
|
||||
String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH);
|
||||
byte[] bytes = convertHexStringToBinary(hexString);
|
||||
return new Binary(bytes);
|
||||
} else {
|
||||
// We have a string without any type prefix, transmit it as-is.
|
||||
return baseId;
|
||||
}
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new AmqpProtocolException("Unable to convert ID value");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the provided hex-string into a binary representation where each byte represents
|
||||
* two characters of the hex string.
|
||||
*
|
||||
* The hex characters may be upper or lower case.
|
||||
*
|
||||
* @param hexString
|
||||
* string to convert to a binary value.
|
||||
*
|
||||
* @return a byte array containing the binary representation
|
||||
*
|
||||
* @throws IllegalArgumentException if the provided String is a non-even length or contains
|
||||
* non-hex characters
|
||||
*/
|
||||
public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException {
|
||||
int length = hexString.length();
|
||||
|
||||
// As each byte needs two characters in the hex encoding, the string must be an even length.
|
||||
if (length % 2 != 0) {
|
||||
throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString);
|
||||
}
|
||||
|
||||
byte[] binary = new byte[length / 2];
|
||||
|
||||
for (int i = 0; i < length; i += 2) {
|
||||
char highBitsChar = hexString.charAt(i);
|
||||
char lowBitsChar = hexString.charAt(i + 1);
|
||||
|
||||
int highBits = hexCharToInt(highBitsChar, hexString) << 4;
|
||||
int lowBits = hexCharToInt(lowBitsChar, hexString);
|
||||
|
||||
binary[i / 2] = (byte) (highBits + lowBits);
|
||||
}
|
||||
|
||||
return binary;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the provided binary into a hex-string representation where each character
|
||||
* represents 4 bits of the provided binary, i.e each byte requires two characters.
|
||||
*
|
||||
* The returned hex characters are upper-case.
|
||||
*
|
||||
* @param bytes
|
||||
* the binary value to convert to a hex String instance.
|
||||
*
|
||||
* @return a String containing a hex representation of the bytes
|
||||
*/
|
||||
public String convertBinaryToHexString(byte[] bytes) {
|
||||
// Each byte is represented as 2 chars
|
||||
StringBuilder builder = new StringBuilder(bytes.length * 2);
|
||||
|
||||
for (byte b : bytes) {
|
||||
// The byte will be expanded to int before shifting, replicating the
|
||||
// sign bit, so mask everything beyond the first 4 bits afterwards
|
||||
int highBitsInt = (b >> 4) & 0xF;
|
||||
// We only want the first 4 bits
|
||||
int lowBitsInt = b & 0xF;
|
||||
|
||||
builder.append(HEX_CHARS[highBitsInt]);
|
||||
builder.append(HEX_CHARS[lowBitsInt]);
|
||||
}
|
||||
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
//----- Internal implementation ------------------------------------------//
|
||||
|
||||
private boolean hasTypeEncodingPrefix(String stringId) {
|
||||
return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) ||
|
||||
hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId);
|
||||
}
|
||||
|
||||
private boolean hasAmqpStringPrefix(String stringId) {
|
||||
return stringId.startsWith(AMQP_STRING_PREFIX);
|
||||
}
|
||||
|
||||
private boolean hasAmqpUlongPrefix(String stringId) {
|
||||
return stringId.startsWith(AMQP_ULONG_PREFIX);
|
||||
}
|
||||
|
||||
private boolean hasAmqpUuidPrefix(String stringId) {
|
||||
return stringId.startsWith(AMQP_UUID_PREFIX);
|
||||
}
|
||||
|
||||
private boolean hasAmqpBinaryPrefix(String stringId) {
|
||||
return stringId.startsWith(AMQP_BINARY_PREFIX);
|
||||
}
|
||||
|
||||
private String strip(String id, int numChars) {
|
||||
return id.substring(numChars);
|
||||
}
|
||||
|
||||
private int hexCharToInt(char ch, String orig) throws IllegalArgumentException {
|
||||
if (ch >= '0' && ch <= '9') {
|
||||
// subtract '0' to get difference in position as an int
|
||||
return ch - '0';
|
||||
} else if (ch >= 'A' && ch <= 'F') {
|
||||
// subtract 'A' to get difference in position as an int
|
||||
// and then add 10 for the offset of 'A'
|
||||
return ch - 'A' + 10;
|
||||
} else if (ch >= 'a' && ch <= 'f') {
|
||||
// subtract 'a' to get difference in position as an int
|
||||
// and then add 10 for the offset of 'a'
|
||||
return ch - 'a' + 10;
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig);
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
|
@ -118,14 +118,17 @@ public abstract class InboundTransformer {
|
|||
} else {
|
||||
jms.setJMSDeliveryMode(defaultDeliveryMode);
|
||||
}
|
||||
|
||||
if (header.getPriority() != null) {
|
||||
jms.setJMSPriority(header.getPriority().intValue());
|
||||
} else {
|
||||
jms.setJMSPriority(defaultPriority);
|
||||
}
|
||||
|
||||
if (header.getFirstAcquirer() != null) {
|
||||
jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
|
||||
}
|
||||
|
||||
if (header.getDeliveryCount() != null) {
|
||||
vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
|
||||
}
|
||||
|
@ -188,7 +191,7 @@ public abstract class InboundTransformer {
|
|||
final Properties properties = amqp.getProperties();
|
||||
if (properties != null) {
|
||||
if (properties.getMessageId() != null) {
|
||||
jms.setJMSMessageID(properties.getMessageId().toString());
|
||||
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId()));
|
||||
}
|
||||
Binary userId = properties.getUserId();
|
||||
if (userId != null) {
|
||||
|
@ -236,6 +239,7 @@ public abstract class InboundTransformer {
|
|||
if (header.getTtl() != null) {
|
||||
ttl = header.getTtl().longValue();
|
||||
}
|
||||
|
||||
if (ttl == 0) {
|
||||
jms.setJMSExpiration(0);
|
||||
} else {
|
||||
|
|
|
@ -41,6 +41,7 @@ import javax.jms.Topic;
|
|||
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.transport.amqp.AmqpProtocolException;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.UnsignedByte;
|
||||
|
@ -180,7 +181,11 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
|
|||
|
||||
MessageId msgId = amqMsg.getMessageId();
|
||||
if (msgId.getTextView() != null) {
|
||||
props.setMessageId(msgId.getTextView());
|
||||
try {
|
||||
props.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(msgId.getTextView()));
|
||||
} catch (AmqpProtocolException e) {
|
||||
props.setMessageId(msgId.getTextView().toString());
|
||||
}
|
||||
} else {
|
||||
props.setMessageId(msgId.toString());
|
||||
}
|
||||
|
|
|
@ -198,6 +198,32 @@ public class AmqpMessage {
|
|||
return message.getProperties().getMessageId().toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the set MessageId value in the original form, if there are no properties
|
||||
* in the given message return null.
|
||||
*
|
||||
* @return the set message ID in its original form or null if not set.
|
||||
*/
|
||||
public Object getRawMessageId() {
|
||||
if (message.getProperties() == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return message.getProperties().getMessageId();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the MessageId property on an outbound message using the provided value
|
||||
*
|
||||
* @param messageId
|
||||
* the message ID value to set.
|
||||
*/
|
||||
public void setRawMessageId(Object messageId) {
|
||||
checkReadOnly();
|
||||
lazyCreateProperties();
|
||||
getWrappedMessage().setMessageId(messageId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the GroupId property on an outbound message using the provided String
|
||||
*
|
||||
|
|
|
@ -0,0 +1,166 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.interop;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.UnsignedLong;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests that the AMQP MessageID value and type are preserved.
|
||||
*/
|
||||
public class AmqpMessageIdPreservationTest extends AmqpClientTestSupport {
|
||||
|
||||
@Override
|
||||
protected boolean isPersistent() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testStringMessageIdIsPreserved() throws Exception {
|
||||
doTestMessageIdPreservation("msg-id-string:1");
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testStringMessageIdIsPreservedAfterRestart() throws Exception {
|
||||
doTestMessageIdPreservationOnBrokerRestart("msg-id-string:1");
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testUUIDMessageIdIsPreserved() throws Exception {
|
||||
doTestMessageIdPreservation(UUID.randomUUID());
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testUUIDMessageIdIsPreservedAfterRestart() throws Exception {
|
||||
doTestMessageIdPreservationOnBrokerRestart(UUID.randomUUID());
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testUnsignedLongMessageIdIsPreserved() throws Exception {
|
||||
doTestMessageIdPreservation(new UnsignedLong(255l));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testUnsignedLongMessageIdIsPreservedAfterRestart() throws Exception {
|
||||
doTestMessageIdPreservationOnBrokerRestart(new UnsignedLong(255l));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testBinaryLongMessageIdIsPreserved() throws Exception {
|
||||
byte[] payload = new byte[32];
|
||||
for (int i = 0; i < 32; ++i) {
|
||||
payload[i] = (byte) ('a' + i);
|
||||
}
|
||||
|
||||
doTestMessageIdPreservation(new Binary(payload));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testBinaryLongMessageIdIsPreservedAfterRestart() throws Exception {
|
||||
byte[] payload = new byte[32];
|
||||
for (int i = 0; i < 32; ++i) {
|
||||
payload[i] = (byte) ('a' + i);
|
||||
}
|
||||
|
||||
doTestMessageIdPreservationOnBrokerRestart(new Binary(payload));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testStringMessageIdPrefixIsPreserved() throws Exception {
|
||||
doTestMessageIdPreservation("ID:msg-id-string:1");
|
||||
}
|
||||
|
||||
public void doTestMessageIdPreservation(Object messageId) throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
|
||||
message.setRawMessageId(messageId);
|
||||
message.setText("Test-Message");
|
||||
|
||||
sender.send(message);
|
||||
|
||||
sender.close();
|
||||
|
||||
QueueViewMBean queue = getProxyToQueue(getTestName());
|
||||
assertEquals(1, queue.getQueueSize());
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull("Should have got a message", received);
|
||||
assertEquals(received.getRawMessageId().getClass(), messageId.getClass());
|
||||
assertEquals(messageId, received.getRawMessageId());
|
||||
receiver.close();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
public void doTestMessageIdPreservationOnBrokerRestart(Object messageId) throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = client.connect();
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
|
||||
message.setRawMessageId(messageId);
|
||||
message.setText("Test-Message");
|
||||
message.setDurable(true);
|
||||
|
||||
sender.send(message);
|
||||
|
||||
sender.close();
|
||||
connection.close();
|
||||
|
||||
restartBroker();
|
||||
|
||||
QueueViewMBean queue = getProxyToQueue(getTestName());
|
||||
assertEquals(1, queue.getQueueSize());
|
||||
|
||||
connection = client.connect();
|
||||
session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull("Should have got a message", received);
|
||||
assertEquals(received.getRawMessageId().getClass(), messageId.getClass());
|
||||
assertEquals(messageId, received.getRawMessageId());
|
||||
receiver.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,406 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.message;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.activemq.transport.amqp.AmqpProtocolException;
|
||||
import org.apache.qpid.jms.exceptions.IdConversionException;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.UnsignedLong;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class AMQPMessageIdHelperTest {
|
||||
|
||||
private AMQPMessageIdHelper messageIdHelper;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
messageIdHelper = new AMQPMessageIdHelper();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
|
||||
* returns null if given null
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithNull() {
|
||||
String nullString = null;
|
||||
assertNull("null string should have been returned", messageIdHelper.toBaseMessageIdString(nullString));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
|
||||
* throws an IAE if given an unexpected object type.
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringThrowsIAEWithUnexpectedType() {
|
||||
try {
|
||||
messageIdHelper.toBaseMessageIdString(new Object());
|
||||
fail("expected exception not thrown");
|
||||
} catch (IllegalArgumentException iae) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
|
||||
* returns the given basic string unchanged
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithString() {
|
||||
String stringMessageId = "myIdString";
|
||||
|
||||
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(stringMessageId);
|
||||
assertNotNull("null string should not have been returned", baseMessageIdString);
|
||||
assertEquals("expected base id string was not returned", stringMessageId, baseMessageIdString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
|
||||
* returns a string indicating an AMQP encoded string, when the given string
|
||||
* happens to already begin with the
|
||||
* {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithStringBeginningWithEncodingPrefixForUUID() {
|
||||
String uuidStringMessageId = AMQPMessageIdHelper.AMQP_UUID_PREFIX + UUID.randomUUID();
|
||||
String expected = AMQPMessageIdHelper.AMQP_STRING_PREFIX + uuidStringMessageId;
|
||||
|
||||
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(uuidStringMessageId);
|
||||
assertNotNull("null string should not have been returned", baseMessageIdString);
|
||||
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
|
||||
* returns a string indicating an AMQP encoded string, when the given string
|
||||
* happens to already begin with the
|
||||
* {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithStringBeginningWithEncodingPrefixForLong() {
|
||||
String longStringMessageId = AMQPMessageIdHelper.AMQP_ULONG_PREFIX + Long.valueOf(123456789L);
|
||||
String expected = AMQPMessageIdHelper.AMQP_STRING_PREFIX + longStringMessageId;
|
||||
|
||||
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(longStringMessageId);
|
||||
assertNotNull("null string should not have been returned", baseMessageIdString);
|
||||
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
|
||||
* returns a string indicating an AMQP encoded string, when the given string
|
||||
* happens to already begin with the
|
||||
* {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithStringBeginningWithEncodingPrefixForBinary() {
|
||||
String binaryStringMessageId = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "0123456789ABCDEF";
|
||||
String expected = AMQPMessageIdHelper.AMQP_STRING_PREFIX + binaryStringMessageId;
|
||||
|
||||
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(binaryStringMessageId);
|
||||
assertNotNull("null string should not have been returned", baseMessageIdString);
|
||||
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
|
||||
* returns a string indicating an AMQP encoded string (effectively twice),
|
||||
* when the given string happens to already begin with the
|
||||
* {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithStringBeginningWithEncodingPrefixForString() {
|
||||
String stringMessageId = AMQPMessageIdHelper.AMQP_STRING_PREFIX + "myStringId";
|
||||
String expected = AMQPMessageIdHelper.AMQP_STRING_PREFIX + stringMessageId;
|
||||
|
||||
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(stringMessageId);
|
||||
assertNotNull("null string should not have been returned", baseMessageIdString);
|
||||
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
|
||||
* returns a string indicating an AMQP encoded UUID when given a UUID
|
||||
* object.
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithUUID() {
|
||||
UUID uuidMessageId = UUID.randomUUID();
|
||||
String expected = AMQPMessageIdHelper.AMQP_UUID_PREFIX + uuidMessageId.toString();
|
||||
|
||||
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(uuidMessageId);
|
||||
assertNotNull("null string should not have been returned", baseMessageIdString);
|
||||
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
|
||||
* returns a string indicating an AMQP encoded ulong when given a
|
||||
* UnsignedLong object.
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithUnsignedLong() {
|
||||
UnsignedLong uLongMessageId = UnsignedLong.valueOf(123456789L);
|
||||
String expected = AMQPMessageIdHelper.AMQP_ULONG_PREFIX + uLongMessageId.toString();
|
||||
|
||||
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(uLongMessageId);
|
||||
assertNotNull("null string should not have been returned", baseMessageIdString);
|
||||
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
|
||||
* returns a string indicating an AMQP encoded binary when given a Binary
|
||||
* object.
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithBinary() {
|
||||
byte[] bytes = new byte[] { (byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 0xFF };
|
||||
Binary binary = new Binary(bytes);
|
||||
|
||||
String expected = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
|
||||
|
||||
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(binary);
|
||||
assertNotNull("null string should not have been returned", baseMessageIdString);
|
||||
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns an
|
||||
* UnsignedLong when given a string indicating an encoded AMQP ulong id.
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithEncodedUlong() throws Exception {
|
||||
UnsignedLong longId = UnsignedLong.valueOf(123456789L);
|
||||
String provided = AMQPMessageIdHelper.AMQP_ULONG_PREFIX + "123456789";
|
||||
|
||||
Object idObject = messageIdHelper.toIdObject(provided);
|
||||
assertNotNull("null object should not have been returned", idObject);
|
||||
assertEquals("expected id object was not returned", longId, idObject);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a Binary
|
||||
* when given a string indicating an encoded AMQP binary id, using upper
|
||||
* case hex characters
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithEncodedBinaryUppercaseHexString() throws Exception {
|
||||
byte[] bytes = new byte[] { (byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 0xFF };
|
||||
Binary binaryId = new Binary(bytes);
|
||||
|
||||
String provided = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
|
||||
|
||||
Object idObject = messageIdHelper.toIdObject(provided);
|
||||
assertNotNull("null object should not have been returned", idObject);
|
||||
assertEquals("expected id object was not returned", binaryId, idObject);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns null
|
||||
* when given null.
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithNull() throws Exception {
|
||||
assertNull("null object should have been returned", messageIdHelper.toIdObject(null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a Binary
|
||||
* when given a string indicating an encoded AMQP binary id, using lower
|
||||
* case hex characters.
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithEncodedBinaryLowercaseHexString() throws Exception {
|
||||
byte[] bytes = new byte[] { (byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 0xFF };
|
||||
Binary binaryId = new Binary(bytes);
|
||||
|
||||
String provided = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00ab09ff";
|
||||
|
||||
Object idObject = messageIdHelper.toIdObject(provided);
|
||||
assertNotNull("null object should not have been returned", idObject);
|
||||
assertEquals("expected id object was not returned", binaryId, idObject);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a UUID
|
||||
* when given a string indicating an encoded AMQP uuid id.
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithEncodedUuid() throws Exception {
|
||||
UUID uuid = UUID.randomUUID();
|
||||
String provided = AMQPMessageIdHelper.AMQP_UUID_PREFIX + uuid.toString();
|
||||
|
||||
Object idObject = messageIdHelper.toIdObject(provided);
|
||||
assertNotNull("null object should not have been returned", idObject);
|
||||
assertEquals("expected id object was not returned", uuid, idObject);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a string
|
||||
* when given a string without any type encoding prefix.
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithStringContainingNoEncodingPrefix() throws Exception {
|
||||
String stringId = "myStringId";
|
||||
|
||||
Object idObject = messageIdHelper.toIdObject(stringId);
|
||||
assertNotNull("null object should not have been returned", idObject);
|
||||
assertEquals("expected id object was not returned", stringId, idObject);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns the
|
||||
* remainder of the provided string after removing the
|
||||
* {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX} prefix.
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithStringContainingStringEncodingPrefix() throws Exception {
|
||||
String suffix = "myStringSuffix";
|
||||
String stringId = AMQPMessageIdHelper.AMQP_STRING_PREFIX + suffix;
|
||||
|
||||
Object idObject = messageIdHelper.toIdObject(stringId);
|
||||
assertNotNull("null object should not have been returned", idObject);
|
||||
assertEquals("expected id object was not returned", suffix, idObject);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that when given a string with with the
|
||||
* {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX} prefix and then
|
||||
* additionally the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}, the
|
||||
* {@link AMQPMessageIdHelper#toIdObject(String)} method returns the
|
||||
* remainder of the provided string after removing the
|
||||
* {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX} prefix.
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithStringContainingStringEncodingPrefixAndThenUuidPrefix() throws Exception {
|
||||
String encodedUuidString = AMQPMessageIdHelper.AMQP_UUID_PREFIX + UUID.randomUUID().toString();
|
||||
String stringId = AMQPMessageIdHelper.AMQP_STRING_PREFIX + encodedUuidString;
|
||||
|
||||
Object idObject = messageIdHelper.toIdObject(stringId);
|
||||
assertNotNull("null object should not have been returned", idObject);
|
||||
assertEquals("expected id object was not returned", encodedUuidString, idObject);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} throws an
|
||||
* {@link IdConversionException} when presented with an encoded binary hex
|
||||
* string of uneven length (after the prefix) that thus can't be converted
|
||||
* due to each byte using 2 characters
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithStringContainingBinaryHexThrowsWithUnevenLengthString() {
|
||||
String unevenHead = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "123";
|
||||
|
||||
try {
|
||||
messageIdHelper.toIdObject(unevenHead);
|
||||
fail("expected exception was not thrown");
|
||||
} catch (AmqpProtocolException ex) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} throws an
|
||||
* {@link IdConversionException} when presented with an encoded binary hex
|
||||
* string (after the prefix) that contains characters other than 0-9 and A-F
|
||||
* and a-f, and thus can't be converted
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithStringContainingBinaryHexThrowsWithNonHexCharacters() {
|
||||
|
||||
// char before '0'
|
||||
char nonHexChar = '/';
|
||||
String nonHexString = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
|
||||
|
||||
try {
|
||||
messageIdHelper.toIdObject(nonHexString);
|
||||
fail("expected exception was not thrown");
|
||||
} catch (AmqpProtocolException ex) {
|
||||
// expected
|
||||
}
|
||||
|
||||
// char after '9', before 'A'
|
||||
nonHexChar = ':';
|
||||
nonHexString = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
|
||||
|
||||
try {
|
||||
messageIdHelper.toIdObject(nonHexString);
|
||||
fail("expected exception was not thrown");
|
||||
} catch (AmqpProtocolException ex) {
|
||||
// expected
|
||||
}
|
||||
|
||||
// char after 'F', before 'a'
|
||||
nonHexChar = 'G';
|
||||
nonHexString = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
|
||||
|
||||
try {
|
||||
messageIdHelper.toIdObject(nonHexString);
|
||||
fail("expected exception was not thrown");
|
||||
} catch (AmqpProtocolException ex) {
|
||||
// expected
|
||||
}
|
||||
|
||||
// char after 'f'
|
||||
nonHexChar = 'g';
|
||||
nonHexString = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
|
||||
|
||||
try {
|
||||
messageIdHelper.toIdObject(nonHexString);
|
||||
fail("expected exception was not thrown");
|
||||
} catch (AmqpProtocolException ex) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue