From 4b50d6c43146c324b6f854bfac07e37e1c17c762 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 17 Mar 2017 17:19:53 -0400 Subject: [PATCH] ARTEMIS-1048 Fixing Unsigned types and Selectors --- .../protocol/amqp/broker/AMQPMessage.java | 13 ++++- .../amqp/AmqpReceiverWithFiltersTest.java | 49 +++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 653ee5f77a..c1c676cf19 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -37,7 +37,10 @@ import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedByte; import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.UnsignedLong; +import org.apache.qpid.proton.amqp.UnsignedShort; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; import org.apache.qpid.proton.amqp.messaging.Header; @@ -719,7 +722,15 @@ public class AMQPMessage extends RefCountMessage { } else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) { return getConnectionID(); } else { - return getApplicationPropertiesMap().get(key); + Object value = getApplicationPropertiesMap().get(key); + if (value instanceof UnsignedInteger || + value instanceof UnsignedByte || + value instanceof UnsignedLong || + value instanceof UnsignedShort) { + return ((Number)value).longValue(); + } else { + return value; + } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverWithFiltersTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverWithFiltersTest.java index 2c2438292c..f8f726a570 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverWithFiltersTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverWithFiltersTest.java @@ -18,21 +18,28 @@ package org.apache.activemq.artemis.tests.integration.amqp; import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpUnknownFilterType; import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.engine.Receiver; +import org.junit.Assert; import org.junit.Test; /** @@ -114,4 +121,46 @@ public class AmqpReceiverWithFiltersTest extends AmqpClientTestSupport { connection.getStateInspector().assertValid(); connection.close(); } + + + @Test(timeout = 60000) + public void testReceivedUnsignedFilter() throws Exception { + final int NUM_MESSAGES = 100; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + + try { + + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getTestName()); + + for (int i = 0; i < NUM_MESSAGES + 1; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("myNewID", new UnsignedInteger(i)); + sender.send(message); + } + + // Read all messages from the Queue, do not accept them yet. + AmqpReceiver receiver = session.createReceiver(getTestName(), "myNewID < " + (NUM_MESSAGES / 2)); + ArrayList messages = new ArrayList<>(NUM_MESSAGES); + receiver.flow((NUM_MESSAGES + 2) * 2); + for (int i = 0; i < NUM_MESSAGES / 2; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + Assert.assertNotNull(message); + System.out.println("Read message: " + message.getApplicationProperty("myNewID")); + assertNotNull(message); + messages.add(message); + } + + Assert.assertNull(receiver.receiveNoWait()); + + } finally { + connection.close(); + } + } + + }