diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index b478c7dbc3..e0b02ae643 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -54,15 +54,15 @@ import org.apache.activemq.command.RemoveInfo; public class AMQConsumer { private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications"; - private AMQSession session; + private final AMQSession session; private final org.apache.activemq.command.ActiveMQDestination openwireDestination; private final boolean hasNotificationDestination; - private ConsumerInfo info; + private final ConsumerInfo info; private final ScheduledExecutorService scheduledPool; private ServerConsumer serverConsumer; private int prefetchSize; - private AtomicInteger currentWindow; + private final AtomicInteger currentWindow; private long messagePullSequence = 0; private MessagePullHandler messagePullHandler; //internal means we don't expose @@ -284,7 +284,7 @@ public class AMQConsumer { if (ack.isIndividualAck() || ack.isStandardAck()) { for (MessageReference ref : ackList) { - ref.acknowledge(transaction); + ref.acknowledge(transaction, serverConsumer); } } else if (ack.isPoisonAck()) { for (MessageReference ref : ackList) { @@ -302,7 +302,7 @@ public class AMQConsumer { } if (ack.isExpiredAck()) { for (MessageReference ref : ackList) { - ref.getQueue().expire(ref); + ref.getQueue().expire(ref, serverConsumer); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 9a37bd81f6..081f7daa07 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.collections.LinkedListImpl; @@ -234,15 +235,20 @@ public class PagedReferenceImpl extends LinkedListImpl.Node @Override public void acknowledge(Transaction tx) throws Exception { - acknowledge(tx, AckReason.NORMAL); + acknowledge(tx, null); } @Override - public void acknowledge(Transaction tx, AckReason reason) throws Exception { + public void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception { + acknowledge(tx, AckReason.NORMAL, consumer); + } + + @Override + public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception { if (tx == null) { - getQueue().acknowledge(this, reason); + getQueue().acknowledge(this, reason, consumer); } else { - getQueue().acknowledge(tx, this, reason); + getQueue().acknowledge(tx, this, reason, consumer); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java index 0db84c543b..48a589fd55 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java @@ -89,7 +89,9 @@ public interface MessageReference { void acknowledge(Transaction tx) throws Exception; - void acknowledge(Transaction tx, AckReason reason) throws Exception; + void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception; + + void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception; void emptyConsumerID(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index c355dbf96d..a23535274f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -107,11 +107,13 @@ public interface Queue extends Bindable,CriticalComponent { void acknowledge(MessageReference ref) throws Exception; - void acknowledge(MessageReference ref, AckReason reason) throws Exception; + void acknowledge(MessageReference ref, ServerConsumer consumer) throws Exception; + + void acknowledge(MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception; void acknowledge(Transaction tx, MessageReference ref) throws Exception; - void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception; + void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception; void reacknowledge(Transaction tx, MessageReference ref) throws Exception; @@ -221,6 +223,8 @@ public interface Queue extends Bindable,CriticalComponent { void expire(MessageReference ref) throws Exception; + void expire(MessageReference ref, ServerConsumer consumer) throws Exception; + boolean sendMessageToDeadLetterAddress(long messageID) throws Exception; int sendMessagesToDeadLetterAddress(Filter filter) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index e3097d195a..fc965912f0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; +import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -276,8 +277,13 @@ public class LastValueQueue extends QueueImpl { } @Override - public void acknowledge(Transaction tx, AckReason reason) throws Exception { - ref.acknowledge(tx, reason); + public void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception { + ref.acknowledge(tx, consumer); + } + + @Override + public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception { + ref.acknowledge(tx, reason, consumer); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index 4d077aedfe..96975e0046 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.collections.LinkedListImpl; @@ -190,15 +191,20 @@ public class MessageReferenceImpl extends LinkedListImpl.Node plugin.messageAcknowledged(ref, reason)); + server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer)); } } @Override public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception { - acknowledge(tx, ref, AckReason.NORMAL); + acknowledge(tx, ref, AckReason.NORMAL, null); } @Override - public void acknowledge(final Transaction tx, final MessageReference ref, AckReason reason) throws Exception { + public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception { if (ref.isPaged()) { pageSubscription.ackTx(tx, (PagedReference) ref); @@ -1289,7 +1295,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } if (server != null && server.hasBrokerPlugins()) { - server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason)); + server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer)); } } @@ -1358,6 +1364,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void expire(final MessageReference ref) throws Exception { + expire(ref, null); + } + + @Override + public void expire(final MessageReference ref, final ServerConsumer consumer) throws Exception { SimpleString messageExpiryAddress = expiryAddressFromMessageAddress(ref); if (messageExpiryAddress == null) { messageExpiryAddress = expiryAddressFromAddressSettings(ref); @@ -1367,17 +1378,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (logger.isTraceEnabled()) { logger.trace("moving expired reference " + ref + " to address = " + messageExpiryAddress + " from queue=" + this.getName()); } - move(null, messageExpiryAddress, null, ref, false, AckReason.EXPIRED); + move(null, messageExpiryAddress, null, ref, false, AckReason.EXPIRED, consumer); } else { if (logger.isTraceEnabled()) { logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName()); } - acknowledge(ref, AckReason.EXPIRED); + acknowledge(ref, AckReason.EXPIRED, consumer); } if (server != null && server.hasBrokerPlugins()) { final SimpleString expiryAddress = messageExpiryAddress; - server.callBrokerPlugins(plugin -> plugin.messageExpired(ref, expiryAddress)); + server.callBrokerPlugins(plugin -> plugin.messageExpired(ref, expiryAddress, consumer)); } } @@ -1490,7 +1501,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception { incDelivering(ref); - acknowledge(tx, ref, ackReason); + acknowledge(tx, ref, ackReason, null); if (fromMessageReferences) { refRemoved(ref); } @@ -1878,7 +1889,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { refRemoved(ref); incDelivering(ref); try { - move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL); + move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null); } catch (Exception e) { decDelivering(ref); throw e; @@ -1922,7 +1933,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } if (!ignored) { - move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL); + move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null); refRemoved(ref); //move(toAddress, tx, ref, false, rejectDuplicates); } @@ -2586,7 +2597,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { postOffice.route(copyMessage, tx, false, rejectDuplicate); if (expiry) { - acknowledge(tx, ref, AckReason.EXPIRED); + acknowledge(tx, ref, AckReason.EXPIRED, null); } else { acknowledge(tx, ref); } @@ -2749,7 +2760,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (bindingList.getBindings().isEmpty()) { ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress); - acknowledge(tx, ref, AckReason.EXPIRED); + acknowledge(tx, ref, AckReason.EXPIRED, null); } else { move(expiryAddress, tx, ref, true, true); } @@ -2760,7 +2771,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name); } - acknowledge(tx, ref, AckReason.EXPIRED); + acknowledge(tx, ref, AckReason.EXPIRED, null); } } @@ -2777,15 +2788,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (bindingList.getBindings().isEmpty()) { ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress); - ref.acknowledge(tx, AckReason.KILLED); + ref.acknowledge(tx, AckReason.KILLED, null); } else { ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name); - move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED); + move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null); } } else { ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name); - ref.acknowledge(tx, AckReason.KILLED); + ref.acknowledge(tx, AckReason.KILLED, null); } } @@ -2794,7 +2805,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { final Binding binding, final MessageReference ref, final boolean rejectDuplicate, - final AckReason reason) throws Exception { + final AckReason reason, + final ServerConsumer consumer) throws Exception { Transaction tx; if (originalTX != null) { @@ -2810,7 +2822,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { postOffice.route(copyMessage, tx, false, rejectDuplicate, binding); - acknowledge(tx, ref, reason); + acknowledge(tx, ref, reason, consumer); if (originalTX == null) { tx.commit(); @@ -3046,7 +3058,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { Transaction transaction = new TransactionImpl(storageManager); for (MessageReference reference : refs) { incDelivering(reference); // post ack will decrement this, so need to inc - acknowledge(transaction, reference, AckReason.KILLED); + acknowledge(transaction, reference, AckReason.KILLED, null); } transaction.commit(); } catch (Exception e) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 8ef0fa1c9f..c81105a734 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -423,7 +423,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } // With pre-ack, we ack *before* sending to the client - ref.getQueue().acknowledge(ref); + ref.getQueue().acknowledge(ref, this); acks++; } @@ -633,7 +633,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (!deliveringRefs.isEmpty()) { for (MessageReference ref : deliveringRefs) { if (performACK) { - ref.acknowledge(tx); + ref.acknowledge(tx, this); performACK = false; } else { @@ -863,7 +863,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { throw ils; } - ref.acknowledge(tx); + ref.acknowledge(tx, this); acks++; } @@ -926,7 +926,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { throw ils; } - ref.acknowledge(tx); + ref.acknowledge(tx, this); acks++; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 55125bde33..f92b45a1c7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -926,10 +926,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public void expire(final long consumerID, final long messageID) throws Exception { - MessageReference ref = locateConsumer(consumerID).removeReferenceByID(messageID); + final ServerConsumer consumer = locateConsumer(consumerID); + MessageReference ref = consumer.removeReferenceByID(messageID); if (ref != null) { - ref.getQueue().expire(ref); + ref.getQueue().expire(ref, consumer); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java index ed00ab05c6..db8a922b37 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java @@ -548,22 +548,57 @@ public interface ActiveMQServerPlugin { * @param message The expired message * @param messageExpiryAddress The message expiry address if exists * @throws ActiveMQException + * + * @deprecated use {@link #messageExpired(MessageReference, SimpleString, ServerConsumer)} */ + @Deprecated default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException { } + /** + * A message has been expired + * + * @param message The expired message + * @param messageExpiryAddress The message expiry address if exists + * @param consumer the Consumer that acknowledged the message - this field is optional + * and can be null + * @throws ActiveMQException + */ + default void messageExpired(MessageReference message, SimpleString messageExpiryAddress, ServerConsumer consumer) throws ActiveMQException { + + } + /** * A message has been acknowledged * * @param ref The acked message * @param reason The ack reason * @throws ActiveMQException + * + * @deprecated use {@link #messageAcknowledged(MessageReference, AckReason, ServerConsumer)} */ + @Deprecated default void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException { } + /** + * A message has been acknowledged + * + * @param ref The acked message + * @param reason The ack reason + * @param consumer the Consumer that acknowledged the message - this field is optional + * and can be null + * @throws ActiveMQException + * + */ + default void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) throws ActiveMQException { + //by default call the old method for backwards compatibility + this.messageAcknowledged(ref, reason); + } + + /** * Before a bridge is deployed * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java index 3e66bf1c29..140c3e1c2e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java @@ -617,7 +617,7 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial * @throws ActiveMQException */ @Override - public void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException { + public void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) throws ActiveMQException { if (logAll || logDeliveringEvents) { //details - debug logging @@ -629,7 +629,8 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial // info level logging LoggingActiveMQServerPluginLogger.LOGGER.messageAcknowledged((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), - (ref == null ? UNAVAILABLE : ref.hasConsumerId() ? Long.toString(ref.getConsumerId()) : null), + (consumer == null ? UNAVAILABLE : consumer.getSessionID() != null ? consumer.getSessionID() : null), + (consumer == null ? UNAVAILABLE : Long.toString(consumer.getID())), (queue == null ? UNAVAILABLE : queue.getName().toString()), reason); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java index 90c5dec44d..f519dd0e0e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java @@ -129,9 +129,9 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger { void messageExpired(MessageReference message, SimpleString messageExpiryAddress); @LogMessage(level = Logger.Level.INFO) - @Message(id = 841014, value = "acknowledged message ID: {0}, with messageRef consumerID: {1}, messageRef QueueName: {2}," + - " with ackReason: {3}", format = Message.Format.MESSAGE_FORMAT) - void messageAcknowledged(String messageID, String consumerID, String queueName, AckReason reason); + @Message(id = 841014, value = "acknowledged message ID: {0}, messageRef sessionID: {1}, with messageRef consumerID: {2}, messageRef QueueName: {3}," + + " with ackReason: {4}", format = Message.Format.MESSAGE_FORMAT) + void messageAcknowledged(String messageID, String sessionID, String consumerID, String queueName, AckReason reason); @LogMessage(level = Logger.Level.INFO) @Message(id = 841015, value = "deployed bridge: {0}", format = Message.Format.MESSAGE_FORMAT) diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 2cae2c7985..fdce2d03cb 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.RandomUtil; @@ -959,7 +960,12 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public void acknowledge(MessageReference ref, AckReason reason) throws Exception { + public void acknowledge(MessageReference ref, ServerConsumer consumer) throws Exception { + + } + + @Override + public void acknowledge(MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception { } @@ -969,7 +975,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception { + public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception { } @@ -1168,6 +1174,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { } + @Override + public void expire(MessageReference ref, ServerConsumer consumer) throws Exception { + + } + @Override public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception { } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java index 6545f705e7..f83b3eef25 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java @@ -58,6 +58,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -76,6 +77,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; @@ -114,6 +116,13 @@ public class CorePluginTest extends JMSTestBase { @Test public void testSendReceive() throws Exception { + final AckPluginVerifier ackVerifier = new AckPluginVerifier((consumer, reason) -> { + assertEquals(AckReason.NORMAL, reason); + assertNotNull(consumer); + }); + + server.registerBrokerPlugin(ackVerifier); + conn = cf.createConnection(); conn.start(); Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -139,6 +148,8 @@ public class CorePluginTest extends JMSTestBase { assertEquals("configurationVerifier is invoked", 1, configurationVerifier.afterSendCounter.get()); assertEquals("configurationVerifier is invoked", 1, configurationVerifier.successRoutedCounter.get()); assertEquals("configurationVerifier config set", "val_1", configurationVerifier.value1); + + assertFalse(ackVerifier.getErrorMsg(), ackVerifier.hasError()); } @Test @@ -199,7 +210,8 @@ public class CorePluginTest extends JMSTestBase { @Test public void testMessageExpireServer() throws Exception { - server.registerBrokerPlugin(new ExpiredPluginVerifier()); + final AckPluginVerifier expiredVerifier = new AckPluginVerifier((ref, reason) -> assertEquals(AckReason.EXPIRED, reason)); + server.registerBrokerPlugin(expiredVerifier); conn = cf.createConnection(); conn.setClientID("test"); @@ -227,11 +239,13 @@ public class CorePluginTest extends JMSTestBase { verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION); + assertFalse(expiredVerifier.getErrorMsg(), expiredVerifier.hasError()); } @Test public void testMessageExpireClient() throws Exception { - server.registerBrokerPlugin(new ExpiredPluginVerifier()); + final AckPluginVerifier expiredVerifier = new AckPluginVerifier((ref, reason) -> assertEquals(AckReason.EXPIRED, reason)); + server.registerBrokerPlugin(expiredVerifier); conn = cf.createConnection(); conn.start(); @@ -260,6 +274,7 @@ public class CorePluginTest extends JMSTestBase { verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION); + assertFalse(expiredVerifier.getErrorMsg(), expiredVerifier.hasError()); } @Test @@ -324,11 +339,31 @@ public class CorePluginTest extends JMSTestBase { verifier.validatePluginMethodsEquals(1, BEFORE_UPDATE_ADDRESS, AFTER_UPDATE_ADDRESS); } - private class ExpiredPluginVerifier implements ActiveMQServerPlugin { + private class AckPluginVerifier implements ActiveMQServerPlugin { + + private BiConsumer assertion; + private Throwable error; + + AckPluginVerifier(BiConsumer assertion) { + this.assertion = assertion; + } @Override - public void messageAcknowledged(MessageReference ref, AckReason reason) { - assertEquals(AckReason.EXPIRED, reason); + public void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) { + try { + assertion.accept(consumer, reason); + } catch (Throwable e) { + error = e; + throw e; + } + } + + private boolean hasError() { + return error != null; + } + + private String getErrorMsg() { + return hasError() ? error.getMessage() : ""; } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java index 8e343ca902..8977ba5f06 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java @@ -272,13 +272,13 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin { } @Override - public void messageExpired(MessageReference message, SimpleString messageExpiryAddress) { + public void messageExpired(MessageReference message, SimpleString messageExpiryAddress, ServerConsumer consumer) { Preconditions.checkNotNull(message); methodCalled(MESSAGE_EXPIRED); } @Override - public void messageAcknowledged(MessageReference ref, AckReason reason) { + public void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) { Preconditions.checkNotNull(ref); Preconditions.checkNotNull(reason); methodCalled(MESSAGE_ACKED); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 25300d3c9d..bc628f580c 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.ReferenceCounter; @@ -213,7 +214,13 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override - public void acknowledge(MessageReference ref, AckReason reason) throws Exception { + public void acknowledge(final MessageReference ref, ServerConsumer consumer) throws Exception { + // no-op + + } + + @Override + public void acknowledge(MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception { // no-op } @@ -225,7 +232,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override - public void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception { + public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception { // no-op } @@ -310,6 +317,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } + @Override + public void expire(final MessageReference ref, final ServerConsumer consumer) throws Exception { + // no-op + + } + @Override public boolean expireReference(final long messageID) throws Exception { // no-op