This commit is contained in:
Clebert Suconic 2020-04-28 11:23:21 -04:00
commit 1a83b13c80
5 changed files with 123 additions and 4 deletions

View File

@ -614,6 +614,10 @@ public interface Message {
Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException; Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException;
default Object getObjectPropertyForFilter(SimpleString key) {
return getObjectProperty(key);
}
Object getObjectProperty(SimpleString key); Object getObjectProperty(SimpleString key);
default Object removeAnnotation(SimpleString key) { default Object removeAnnotation(SimpleString key) {

View File

@ -273,6 +273,19 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
return protonMessage; return protonMessage;
} }
@Override
public Object getObjectPropertyForFilter(SimpleString key) {
Object value = getObjectProperty(key);
if (value == null) {
value = getMessageAnnotation(key.toString());
}
if (value == null) {
value = getExtraBytesProperty(key);
}
return value;
}
/** /**
* Returns a copy of the message Header if one is present, changes to the returned * Returns a copy of the message Header if one is present, changes to the returned
* Header instance do not affect the original Message. * Header instance do not affect the original Message.

View File

@ -239,7 +239,7 @@ public class FilterImpl implements Filter {
result = bytes == null ? null : ByteUtil.bytesToInt(bytes); result = bytes == null ? null : ByteUtil.bytesToInt(bytes);
} }
if (result == null) { if (result == null) {
result = message.getObjectProperty(id); result = message.getObjectPropertyForFilter(id);
} }
if (result != null) { if (result != null) {
if (result.getClass() == SimpleString.class) { if (result.getClass() == SimpleString.class) {

View File

@ -27,6 +27,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class AmqpExpiredMessageTest extends AmqpClientTestSupport { public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
@ -120,6 +121,68 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
connection.close(); connection.close();
} }
/** This test is validating a broker feature where the message copy through the DLQ will receive an annotation.
* It is also testing filter on that annotation. */
@Test(timeout = 60000)
public void testExpiryThroughTTLValidateAnnotation() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setTimeToLive(1);
message.setText("Test-Message");
message.setDurable(true);
message.setApplicationProperty("key1", "Value1");
sender.send(message);
sender.close();
Thread.sleep(100);
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
AmqpMessage received = receiver.receiveNoWait();
assertNull(received);
Wait.assertEquals(1, queueView::getMessagesExpired);
connection.close();
// This will stop and start the server
// to make sure the message is decoded again from its binary format
// avoiding any parsing cached at the server.
server.stop();
server.start();
final Queue dlqView = getProxyToQueue(getDeadLetterAddress());
assertNotNull(dlqView);
Wait.assertEquals(1, dlqView::getMessageCount);
client = createAmqpClient();
connection = addConnection(client.connect());
session = connection.createSession();
AmqpReceiver receiverDLQ = session.createReceiver(getDeadLetterAddress(), "_AMQ_ORIG_ADDRESS='" + getQueueName() + "'");
receiverDLQ.flow(1);
received = receiverDLQ.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(received);
received.accept();
assertNotNull("Should have read message from DLQ", received);
assertEquals(0, received.getTimeToLive());
assertNotNull(received);
assertEquals("Value1", received.getApplicationProperty("key1"));
connection.close();
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception { public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception {
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
@ -272,7 +335,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
sender.send(message); sender.send(message);
sender.close(); sender.close();
assertEquals(1, queueView.getMessageCount()); Wait.assertEquals(1, queueView::getMessageCount);
// Now try and get the message // Now try and get the message
AmqpReceiver receiver = session.createReceiver(getQueueName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
@ -280,7 +343,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received); assertNotNull(received);
assertEquals(0, queueView.getMessagesExpired()); Wait.assertEquals(0, queueView::getMessagesExpired);
connection.close(); connection.close();
} }
@ -305,7 +368,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
Thread.sleep(50); Thread.sleep(50);
assertEquals(1, queueView.getMessageCount()); Wait.assertEquals(1, queueView::getMessageCount);
// Now try and get the message // Now try and get the message
AmqpReceiver receiver = session.createReceiver(getQueueName()); AmqpReceiver receiver = session.createReceiver(getQueueName());

View File

@ -384,6 +384,45 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close(); connection.close();
} }
@Test(timeout = 60000)
public void testSendFilterAnnotation() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + 1);
message.setMessageAnnotation("serialNo", 1);
message.setText("Test-Message");
sender.send(message);
message = new AmqpMessage();
message.setMessageId("msg" + 2);
message.setMessageAnnotation("serialNo", 2);
message.setText("Test-Message 2");
sender.send(message);
sender.close();
LOG.debug("Attempting to read message with receiver");
AmqpReceiver receiver = session.createReceiver(getQueueName(), "serialNo=2");
receiver.flow(2);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received);
assertEquals("msg2", received.getMessageId());
received.accept();
Assert.assertNull(receiver.receiveNoWait());
receiver.close();
connection.close();
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testCloseBusyReceiver() throws Exception { public void testCloseBusyReceiver() throws Exception {
final int MSG_COUNT = 20; final int MSG_COUNT = 20;