From 2ebf3c8e1b90558987d7f169c11f4e7560cf72cf Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 5 Jun 2019 16:10:38 -0400 Subject: [PATCH] ARTEMIS-2372 Filtering on Message Annotations --- .../activemq/artemis/api/core/Message.java | 4 ++ .../protocol/amqp/broker/AMQPMessage.java | 13 ++++ .../artemis/core/filter/impl/FilterImpl.java | 2 +- .../amqp/AmqpExpiredMessageTest.java | 69 ++++++++++++++++++- .../integration/amqp/AmqpSendReceiveTest.java | 39 +++++++++++ 5 files changed, 123 insertions(+), 4 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index a8b54c7aa7..568cdda65b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -614,6 +614,10 @@ public interface Message { Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException; + default Object getObjectPropertyForFilter(SimpleString key) { + return getObjectProperty(key); + } + Object getObjectProperty(SimpleString key); default Object removeAnnotation(SimpleString key) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 64fec6e63f..e0cd94b17e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -273,6 +273,19 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. return protonMessage; } + @Override + public Object getObjectPropertyForFilter(SimpleString key) { + Object value = getObjectProperty(key); + if (value == null) { + value = getMessageAnnotation(key.toString()); + } + if (value == null) { + value = getExtraBytesProperty(key); + } + + return value; + } + /** * Returns a copy of the message Header if one is present, changes to the returned * Header instance do not affect the original Message. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java index da36d6ff7e..288a91cfe7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java @@ -239,7 +239,7 @@ public class FilterImpl implements Filter { result = bytes == null ? null : ByteUtil.bytesToInt(bytes); } if (result == null) { - result = message.getObjectProperty(id); + result = message.getObjectPropertyForFilter(id); } if (result != null) { if (result.getClass() == SimpleString.class) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java index 759a854f1b..54458b2010 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java @@ -27,6 +27,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Assert; import org.junit.Test; public class AmqpExpiredMessageTest extends AmqpClientTestSupport { @@ -120,6 +121,68 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { connection.close(); } + /** This test is validating a broker feature where the message copy through the DLQ will receive an annotation. + * It is also testing filter on that annotation. */ + @Test(timeout = 60000) + public void testExpiryThroughTTLValidateAnnotation() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName()); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setTimeToLive(1); + message.setText("Test-Message"); + message.setDurable(true); + message.setApplicationProperty("key1", "Value1"); + sender.send(message); + sender.close(); + + Thread.sleep(100); + + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getQueueName()); + receiver.flow(1); + AmqpMessage received = receiver.receiveNoWait(); + assertNull(received); + + Wait.assertEquals(1, queueView::getMessagesExpired); + + connection.close(); + + // This will stop and start the server + // to make sure the message is decoded again from its binary format + // avoiding any parsing cached at the server. + server.stop(); + server.start(); + + final Queue dlqView = getProxyToQueue(getDeadLetterAddress()); + assertNotNull(dlqView); + Wait.assertEquals(1, dlqView::getMessageCount); + + client = createAmqpClient(); + connection = addConnection(client.connect()); + session = connection.createSession(); + + AmqpReceiver receiverDLQ = session.createReceiver(getDeadLetterAddress(), "_AMQ_ORIG_ADDRESS='" + getQueueName() + "'"); + receiverDLQ.flow(1); + received = receiverDLQ.receive(5, TimeUnit.SECONDS); + Assert.assertNotNull(received); + received.accept(); + + assertNotNull("Should have read message from DLQ", received); + assertEquals(0, received.getTimeToLive()); + assertNotNull(received); + assertEquals("Value1", received.getApplicationProperty("key1")); + + connection.close(); + } + @Test(timeout = 60000) public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception { AmqpClient client = createAmqpClient(); @@ -272,7 +335,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { sender.send(message); sender.close(); - assertEquals(1, queueView.getMessageCount()); + Wait.assertEquals(1, queueView::getMessageCount); // Now try and get the message AmqpReceiver receiver = session.createReceiver(getQueueName()); @@ -280,7 +343,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(received); - assertEquals(0, queueView.getMessagesExpired()); + Wait.assertEquals(0, queueView::getMessagesExpired); connection.close(); } @@ -305,7 +368,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { Thread.sleep(50); - assertEquals(1, queueView.getMessageCount()); + Wait.assertEquals(1, queueView::getMessageCount); // Now try and get the message AmqpReceiver receiver = session.createReceiver(getQueueName()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index cb1db9246e..85c304c02f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -384,6 +384,45 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { connection.close(); } + @Test(timeout = 60000) + public void testSendFilterAnnotation() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName()); + + AmqpMessage message = new AmqpMessage(); + + message.setMessageId("msg" + 1); + message.setMessageAnnotation("serialNo", 1); + message.setText("Test-Message"); + sender.send(message); + + message = new AmqpMessage(); + message.setMessageId("msg" + 2); + message.setMessageAnnotation("serialNo", 2); + message.setText("Test-Message 2"); + sender.send(message); + sender.close(); + + LOG.debug("Attempting to read message with receiver"); + AmqpReceiver receiver = session.createReceiver(getQueueName(), "serialNo=2"); + receiver.flow(2); + AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull("Should have read message", received); + assertEquals("msg2", received.getMessageId()); + received.accept(); + + Assert.assertNull(receiver.receiveNoWait()); + + receiver.close(); + + connection.close(); + } + + @Test(timeout = 60000) public void testCloseBusyReceiver() throws Exception { final int MSG_COUNT = 20;