ARTEMIS-2372 Filtering on Message Annotations
This commit is contained in:
parent
ac9ebc6877
commit
2ebf3c8e1b
|
@ -614,6 +614,10 @@ public interface Message {
|
||||||
|
|
||||||
Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException;
|
Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException;
|
||||||
|
|
||||||
|
default Object getObjectPropertyForFilter(SimpleString key) {
|
||||||
|
return getObjectProperty(key);
|
||||||
|
}
|
||||||
|
|
||||||
Object getObjectProperty(SimpleString key);
|
Object getObjectProperty(SimpleString key);
|
||||||
|
|
||||||
default Object removeAnnotation(SimpleString key) {
|
default Object removeAnnotation(SimpleString key) {
|
||||||
|
|
|
@ -273,6 +273,19 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
return protonMessage;
|
return protonMessage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object getObjectPropertyForFilter(SimpleString key) {
|
||||||
|
Object value = getObjectProperty(key);
|
||||||
|
if (value == null) {
|
||||||
|
value = getMessageAnnotation(key.toString());
|
||||||
|
}
|
||||||
|
if (value == null) {
|
||||||
|
value = getExtraBytesProperty(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a copy of the message Header if one is present, changes to the returned
|
* Returns a copy of the message Header if one is present, changes to the returned
|
||||||
* Header instance do not affect the original Message.
|
* Header instance do not affect the original Message.
|
||||||
|
|
|
@ -239,7 +239,7 @@ public class FilterImpl implements Filter {
|
||||||
result = bytes == null ? null : ByteUtil.bytesToInt(bytes);
|
result = bytes == null ? null : ByteUtil.bytesToInt(bytes);
|
||||||
}
|
}
|
||||||
if (result == null) {
|
if (result == null) {
|
||||||
result = message.getObjectProperty(id);
|
result = message.getObjectPropertyForFilter(id);
|
||||||
}
|
}
|
||||||
if (result != null) {
|
if (result != null) {
|
||||||
if (result.getClass() == SimpleString.class) {
|
if (result.getClass() == SimpleString.class) {
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
||||||
|
@ -120,6 +121,68 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** This test is validating a broker feature where the message copy through the DLQ will receive an annotation.
|
||||||
|
* It is also testing filter on that annotation. */
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testExpiryThroughTTLValidateAnnotation() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpSender sender = session.createSender(getQueueName());
|
||||||
|
|
||||||
|
// Get the Queue View early to avoid racing the delivery.
|
||||||
|
final Queue queueView = getProxyToQueue(getQueueName());
|
||||||
|
assertNotNull(queueView);
|
||||||
|
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setTimeToLive(1);
|
||||||
|
message.setText("Test-Message");
|
||||||
|
message.setDurable(true);
|
||||||
|
message.setApplicationProperty("key1", "Value1");
|
||||||
|
sender.send(message);
|
||||||
|
sender.close();
|
||||||
|
|
||||||
|
Thread.sleep(100);
|
||||||
|
|
||||||
|
// Now try and get the message
|
||||||
|
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
||||||
|
receiver.flow(1);
|
||||||
|
AmqpMessage received = receiver.receiveNoWait();
|
||||||
|
assertNull(received);
|
||||||
|
|
||||||
|
Wait.assertEquals(1, queueView::getMessagesExpired);
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
|
||||||
|
// This will stop and start the server
|
||||||
|
// to make sure the message is decoded again from its binary format
|
||||||
|
// avoiding any parsing cached at the server.
|
||||||
|
server.stop();
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
final Queue dlqView = getProxyToQueue(getDeadLetterAddress());
|
||||||
|
assertNotNull(dlqView);
|
||||||
|
Wait.assertEquals(1, dlqView::getMessageCount);
|
||||||
|
|
||||||
|
client = createAmqpClient();
|
||||||
|
connection = addConnection(client.connect());
|
||||||
|
session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiverDLQ = session.createReceiver(getDeadLetterAddress(), "_AMQ_ORIG_ADDRESS='" + getQueueName() + "'");
|
||||||
|
receiverDLQ.flow(1);
|
||||||
|
received = receiverDLQ.receive(5, TimeUnit.SECONDS);
|
||||||
|
Assert.assertNotNull(received);
|
||||||
|
received.accept();
|
||||||
|
|
||||||
|
assertNotNull("Should have read message from DLQ", received);
|
||||||
|
assertEquals(0, received.getTimeToLive());
|
||||||
|
assertNotNull(received);
|
||||||
|
assertEquals("Value1", received.getApplicationProperty("key1"));
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception {
|
public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception {
|
||||||
AmqpClient client = createAmqpClient();
|
AmqpClient client = createAmqpClient();
|
||||||
|
@ -272,7 +335,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
||||||
sender.send(message);
|
sender.send(message);
|
||||||
sender.close();
|
sender.close();
|
||||||
|
|
||||||
assertEquals(1, queueView.getMessageCount());
|
Wait.assertEquals(1, queueView::getMessageCount);
|
||||||
|
|
||||||
// Now try and get the message
|
// Now try and get the message
|
||||||
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
||||||
|
@ -280,7 +343,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
||||||
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
assertNotNull(received);
|
assertNotNull(received);
|
||||||
|
|
||||||
assertEquals(0, queueView.getMessagesExpired());
|
Wait.assertEquals(0, queueView::getMessagesExpired);
|
||||||
|
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
@ -305,7 +368,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
||||||
|
|
||||||
Thread.sleep(50);
|
Thread.sleep(50);
|
||||||
|
|
||||||
assertEquals(1, queueView.getMessageCount());
|
Wait.assertEquals(1, queueView::getMessageCount);
|
||||||
|
|
||||||
// Now try and get the message
|
// Now try and get the message
|
||||||
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
||||||
|
|
|
@ -384,6 +384,45 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testSendFilterAnnotation() throws Exception {
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpSender sender = session.createSender(getQueueName());
|
||||||
|
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
|
||||||
|
message.setMessageId("msg" + 1);
|
||||||
|
message.setMessageAnnotation("serialNo", 1);
|
||||||
|
message.setText("Test-Message");
|
||||||
|
sender.send(message);
|
||||||
|
|
||||||
|
message = new AmqpMessage();
|
||||||
|
message.setMessageId("msg" + 2);
|
||||||
|
message.setMessageAnnotation("serialNo", 2);
|
||||||
|
message.setText("Test-Message 2");
|
||||||
|
sender.send(message);
|
||||||
|
sender.close();
|
||||||
|
|
||||||
|
LOG.debug("Attempting to read message with receiver");
|
||||||
|
AmqpReceiver receiver = session.createReceiver(getQueueName(), "serialNo=2");
|
||||||
|
receiver.flow(2);
|
||||||
|
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
|
||||||
|
assertNotNull("Should have read message", received);
|
||||||
|
assertEquals("msg2", received.getMessageId());
|
||||||
|
received.accept();
|
||||||
|
|
||||||
|
Assert.assertNull(receiver.receiveNoWait());
|
||||||
|
|
||||||
|
receiver.close();
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testCloseBusyReceiver() throws Exception {
|
public void testCloseBusyReceiver() throws Exception {
|
||||||
final int MSG_COUNT = 20;
|
final int MSG_COUNT = 20;
|
||||||
|
|
Loading…
Reference in New Issue