This commit is contained in:
Timothy Bish 2018-04-14 10:47:47 -04:00
commit c4763bc088
16 changed files with 193 additions and 61 deletions

View File

@ -54,15 +54,15 @@ import org.apache.activemq.command.RemoveInfo;
public class AMQConsumer {
private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
private AMQSession session;
private final AMQSession session;
private final org.apache.activemq.command.ActiveMQDestination openwireDestination;
private final boolean hasNotificationDestination;
private ConsumerInfo info;
private final ConsumerInfo info;
private final ScheduledExecutorService scheduledPool;
private ServerConsumer serverConsumer;
private int prefetchSize;
private AtomicInteger currentWindow;
private final AtomicInteger currentWindow;
private long messagePullSequence = 0;
private MessagePullHandler messagePullHandler;
//internal means we don't expose
@ -284,7 +284,7 @@ public class AMQConsumer {
if (ack.isIndividualAck() || ack.isStandardAck()) {
for (MessageReference ref : ackList) {
ref.acknowledge(transaction);
ref.acknowledge(transaction, serverConsumer);
}
} else if (ack.isPoisonAck()) {
for (MessageReference ref : ackList) {
@ -302,7 +302,7 @@ public class AMQConsumer {
}
if (ack.isExpiredAck()) {
for (MessageReference ref : ackList) {
ref.getQueue().expire(ref);
ref.getQueue().expire(ref, serverConsumer);
}
}
}

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
@ -234,15 +235,20 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
@Override
public void acknowledge(Transaction tx) throws Exception {
acknowledge(tx, AckReason.NORMAL);
acknowledge(tx, null);
}
@Override
public void acknowledge(Transaction tx, AckReason reason) throws Exception {
public void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception {
acknowledge(tx, AckReason.NORMAL, consumer);
}
@Override
public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception {
if (tx == null) {
getQueue().acknowledge(this, reason);
getQueue().acknowledge(this, reason, consumer);
} else {
getQueue().acknowledge(tx, this, reason);
getQueue().acknowledge(tx, this, reason, consumer);
}
}

View File

@ -89,7 +89,9 @@ public interface MessageReference {
void acknowledge(Transaction tx) throws Exception;
void acknowledge(Transaction tx, AckReason reason) throws Exception;
void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception;
void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception;
void emptyConsumerID();

View File

@ -107,11 +107,13 @@ public interface Queue extends Bindable,CriticalComponent {
void acknowledge(MessageReference ref) throws Exception;
void acknowledge(MessageReference ref, AckReason reason) throws Exception;
void acknowledge(MessageReference ref, ServerConsumer consumer) throws Exception;
void acknowledge(MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception;
void acknowledge(Transaction tx, MessageReference ref) throws Exception;
void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception;
void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception;
void reacknowledge(Transaction tx, MessageReference ref) throws Exception;
@ -221,6 +223,8 @@ public interface Queue extends Bindable,CriticalComponent {
void expire(MessageReference ref) throws Exception;
void expire(MessageReference ref, ServerConsumer consumer) throws Exception;
boolean sendMessageToDeadLetterAddress(long messageID) throws Exception;
int sendMessagesToDeadLetterAddress(Filter filter) throws Exception;

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -276,8 +277,13 @@ public class LastValueQueue extends QueueImpl {
}
@Override
public void acknowledge(Transaction tx, AckReason reason) throws Exception {
ref.acknowledge(tx, reason);
public void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception {
ref.acknowledge(tx, consumer);
}
@Override
public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception {
ref.acknowledge(tx, reason, consumer);
}
@Override

View File

@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
@ -190,15 +191,20 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
@Override
public void acknowledge(Transaction tx) throws Exception {
acknowledge(tx, AckReason.NORMAL);
acknowledge(tx, null);
}
@Override
public void acknowledge(Transaction tx, AckReason reason) throws Exception {
public void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception {
acknowledge(tx, AckReason.NORMAL, consumer);
}
@Override
public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception {
if (tx == null) {
getQueue().acknowledge(this, reason);
getQueue().acknowledge(this, reason, consumer);
} else {
getQueue().acknowledge(tx, this, reason);
getQueue().acknowledge(tx, this, reason, consumer);
}
}

View File

@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
import org.apache.activemq.artemis.core.server.management.ManagementService;
@ -1223,11 +1224,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void acknowledge(final MessageReference ref) throws Exception {
acknowledge(ref, AckReason.NORMAL);
acknowledge(ref, null);
}
@Override
public void acknowledge(final MessageReference ref, AckReason reason) throws Exception {
public void acknowledge(final MessageReference ref, final ServerConsumer consumer) throws Exception {
acknowledge(ref, AckReason.NORMAL, consumer);
}
@Override
public void acknowledge(final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
if (ref.isPaged()) {
pageSubscription.ack((PagedReference) ref);
postAcknowledge(ref);
@ -1251,17 +1257,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (server != null && server.hasBrokerPlugins()) {
server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason));
server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
}
}
@Override
public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception {
acknowledge(tx, ref, AckReason.NORMAL);
acknowledge(tx, ref, AckReason.NORMAL, null);
}
@Override
public void acknowledge(final Transaction tx, final MessageReference ref, AckReason reason) throws Exception {
public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
if (ref.isPaged()) {
pageSubscription.ackTx(tx, (PagedReference) ref);
@ -1289,7 +1295,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (server != null && server.hasBrokerPlugins()) {
server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason));
server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
}
}
@ -1358,6 +1364,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void expire(final MessageReference ref) throws Exception {
expire(ref, null);
}
@Override
public void expire(final MessageReference ref, final ServerConsumer consumer) throws Exception {
SimpleString messageExpiryAddress = expiryAddressFromMessageAddress(ref);
if (messageExpiryAddress == null) {
messageExpiryAddress = expiryAddressFromAddressSettings(ref);
@ -1367,17 +1378,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (logger.isTraceEnabled()) {
logger.trace("moving expired reference " + ref + " to address = " + messageExpiryAddress + " from queue=" + this.getName());
}
move(null, messageExpiryAddress, null, ref, false, AckReason.EXPIRED);
move(null, messageExpiryAddress, null, ref, false, AckReason.EXPIRED, consumer);
} else {
if (logger.isTraceEnabled()) {
logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName());
}
acknowledge(ref, AckReason.EXPIRED);
acknowledge(ref, AckReason.EXPIRED, consumer);
}
if (server != null && server.hasBrokerPlugins()) {
final SimpleString expiryAddress = messageExpiryAddress;
server.callBrokerPlugins(plugin -> plugin.messageExpired(ref, expiryAddress));
server.callBrokerPlugins(plugin -> plugin.messageExpired(ref, expiryAddress, consumer));
}
}
@ -1490,7 +1501,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
incDelivering(ref);
acknowledge(tx, ref, ackReason);
acknowledge(tx, ref, ackReason, null);
if (fromMessageReferences) {
refRemoved(ref);
}
@ -1878,7 +1889,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
refRemoved(ref);
incDelivering(ref);
try {
move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL);
move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null);
} catch (Exception e) {
decDelivering(ref);
throw e;
@ -1922,7 +1933,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (!ignored) {
move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL);
move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null);
refRemoved(ref);
//move(toAddress, tx, ref, false, rejectDuplicates);
}
@ -2586,7 +2597,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
postOffice.route(copyMessage, tx, false, rejectDuplicate);
if (expiry) {
acknowledge(tx, ref, AckReason.EXPIRED);
acknowledge(tx, ref, AckReason.EXPIRED, null);
} else {
acknowledge(tx, ref);
}
@ -2749,7 +2760,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (bindingList.getBindings().isEmpty()) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
acknowledge(tx, ref, AckReason.EXPIRED);
acknowledge(tx, ref, AckReason.EXPIRED, null);
} else {
move(expiryAddress, tx, ref, true, true);
}
@ -2760,7 +2771,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name);
}
acknowledge(tx, ref, AckReason.EXPIRED);
acknowledge(tx, ref, AckReason.EXPIRED, null);
}
}
@ -2777,15 +2788,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (bindingList.getBindings().isEmpty()) {
ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress);
ref.acknowledge(tx, AckReason.KILLED);
ref.acknowledge(tx, AckReason.KILLED, null);
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED);
move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
}
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);
ref.acknowledge(tx, AckReason.KILLED);
ref.acknowledge(tx, AckReason.KILLED, null);
}
}
@ -2794,7 +2805,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final Binding binding,
final MessageReference ref,
final boolean rejectDuplicate,
final AckReason reason) throws Exception {
final AckReason reason,
final ServerConsumer consumer) throws Exception {
Transaction tx;
if (originalTX != null) {
@ -2810,7 +2822,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);
acknowledge(tx, ref, reason);
acknowledge(tx, ref, reason, consumer);
if (originalTX == null) {
tx.commit();
@ -3046,7 +3058,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
Transaction transaction = new TransactionImpl(storageManager);
for (MessageReference reference : refs) {
incDelivering(reference); // post ack will decrement this, so need to inc
acknowledge(transaction, reference, AckReason.KILLED);
acknowledge(transaction, reference, AckReason.KILLED, null);
}
transaction.commit();
} catch (Exception e) {

View File

@ -423,7 +423,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
// With pre-ack, we ack *before* sending to the client
ref.getQueue().acknowledge(ref);
ref.getQueue().acknowledge(ref, this);
acks++;
}
@ -633,7 +633,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
if (!deliveringRefs.isEmpty()) {
for (MessageReference ref : deliveringRefs) {
if (performACK) {
ref.acknowledge(tx);
ref.acknowledge(tx, this);
performACK = false;
} else {
@ -863,7 +863,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
throw ils;
}
ref.acknowledge(tx);
ref.acknowledge(tx, this);
acks++;
}
@ -926,7 +926,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
throw ils;
}
ref.acknowledge(tx);
ref.acknowledge(tx, this);
acks++;

View File

@ -926,10 +926,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void expire(final long consumerID, final long messageID) throws Exception {
MessageReference ref = locateConsumer(consumerID).removeReferenceByID(messageID);
final ServerConsumer consumer = locateConsumer(consumerID);
MessageReference ref = consumer.removeReferenceByID(messageID);
if (ref != null) {
ref.getQueue().expire(ref);
ref.getQueue().expire(ref, consumer);
}
}

View File

@ -548,22 +548,57 @@ public interface ActiveMQServerPlugin {
* @param message The expired message
* @param messageExpiryAddress The message expiry address if exists
* @throws ActiveMQException
*
* @deprecated use {@link #messageExpired(MessageReference, SimpleString, ServerConsumer)}
*/
@Deprecated
default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException {
}
/**
* A message has been expired
*
* @param message The expired message
* @param messageExpiryAddress The message expiry address if exists
* @param consumer the Consumer that acknowledged the message - this field is optional
* and can be null
* @throws ActiveMQException
*/
default void messageExpired(MessageReference message, SimpleString messageExpiryAddress, ServerConsumer consumer) throws ActiveMQException {
}
/**
* A message has been acknowledged
*
* @param ref The acked message
* @param reason The ack reason
* @throws ActiveMQException
*
* @deprecated use {@link #messageAcknowledged(MessageReference, AckReason, ServerConsumer)}
*/
@Deprecated
default void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException {
}
/**
* A message has been acknowledged
*
* @param ref The acked message
* @param reason The ack reason
* @param consumer the Consumer that acknowledged the message - this field is optional
* and can be null
* @throws ActiveMQException
*
*/
default void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) throws ActiveMQException {
//by default call the old method for backwards compatibility
this.messageAcknowledged(ref, reason);
}
/**
* Before a bridge is deployed
*

View File

@ -617,7 +617,7 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
* @throws ActiveMQException
*/
@Override
public void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException {
public void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) throws ActiveMQException {
if (logAll || logDeliveringEvents) {
//details - debug logging
@ -629,7 +629,8 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
// info level logging
LoggingActiveMQServerPluginLogger.LOGGER.messageAcknowledged((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
(ref == null ? UNAVAILABLE : ref.hasConsumerId() ? Long.toString(ref.getConsumerId()) : null),
(consumer == null ? UNAVAILABLE : consumer.getSessionID() != null ? consumer.getSessionID() : null),
(consumer == null ? UNAVAILABLE : Long.toString(consumer.getID())),
(queue == null ? UNAVAILABLE : queue.getName().toString()),
reason);
}

View File

@ -129,9 +129,9 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger {
void messageExpired(MessageReference message, SimpleString messageExpiryAddress);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 841014, value = "acknowledged message ID: {0}, with messageRef consumerID: {1}, messageRef QueueName: {2}," +
" with ackReason: {3}", format = Message.Format.MESSAGE_FORMAT)
void messageAcknowledged(String messageID, String consumerID, String queueName, AckReason reason);
@Message(id = 841014, value = "acknowledged message ID: {0}, messageRef sessionID: {1}, with messageRef consumerID: {2}, messageRef QueueName: {3}," +
" with ackReason: {4}", format = Message.Format.MESSAGE_FORMAT)
void messageAcknowledged(String messageID, String sessionID, String consumerID, String queueName, AckReason reason);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 841015, value = "deployed bridge: {0}", format = Message.Format.MESSAGE_FORMAT)

View File

@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
@ -959,7 +960,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void acknowledge(MessageReference ref, AckReason reason) throws Exception {
public void acknowledge(MessageReference ref, ServerConsumer consumer) throws Exception {
}
@Override
public void acknowledge(MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
}
@ -969,7 +975,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception {
public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
}
@ -1168,6 +1174,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void expire(MessageReference ref, ServerConsumer consumer) throws Exception {
}
@Override
public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
}

View File

@ -58,6 +58,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@ -76,6 +77,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
@ -114,6 +116,13 @@ public class CorePluginTest extends JMSTestBase {
@Test
public void testSendReceive() throws Exception {
final AckPluginVerifier ackVerifier = new AckPluginVerifier((consumer, reason) -> {
assertEquals(AckReason.NORMAL, reason);
assertNotNull(consumer);
});
server.registerBrokerPlugin(ackVerifier);
conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -139,6 +148,8 @@ public class CorePluginTest extends JMSTestBase {
assertEquals("configurationVerifier is invoked", 1, configurationVerifier.afterSendCounter.get());
assertEquals("configurationVerifier is invoked", 1, configurationVerifier.successRoutedCounter.get());
assertEquals("configurationVerifier config set", "val_1", configurationVerifier.value1);
assertFalse(ackVerifier.getErrorMsg(), ackVerifier.hasError());
}
@Test
@ -199,7 +210,8 @@ public class CorePluginTest extends JMSTestBase {
@Test
public void testMessageExpireServer() throws Exception {
server.registerBrokerPlugin(new ExpiredPluginVerifier());
final AckPluginVerifier expiredVerifier = new AckPluginVerifier((ref, reason) -> assertEquals(AckReason.EXPIRED, reason));
server.registerBrokerPlugin(expiredVerifier);
conn = cf.createConnection();
conn.setClientID("test");
@ -227,11 +239,13 @@ public class CorePluginTest extends JMSTestBase {
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
AFTER_CLOSE_SESSION);
assertFalse(expiredVerifier.getErrorMsg(), expiredVerifier.hasError());
}
@Test
public void testMessageExpireClient() throws Exception {
server.registerBrokerPlugin(new ExpiredPluginVerifier());
final AckPluginVerifier expiredVerifier = new AckPluginVerifier((ref, reason) -> assertEquals(AckReason.EXPIRED, reason));
server.registerBrokerPlugin(expiredVerifier);
conn = cf.createConnection();
conn.start();
@ -260,6 +274,7 @@ public class CorePluginTest extends JMSTestBase {
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
AFTER_CLOSE_SESSION);
assertFalse(expiredVerifier.getErrorMsg(), expiredVerifier.hasError());
}
@Test
@ -324,11 +339,31 @@ public class CorePluginTest extends JMSTestBase {
verifier.validatePluginMethodsEquals(1, BEFORE_UPDATE_ADDRESS, AFTER_UPDATE_ADDRESS);
}
private class ExpiredPluginVerifier implements ActiveMQServerPlugin {
private class AckPluginVerifier implements ActiveMQServerPlugin {
private BiConsumer<ServerConsumer, AckReason> assertion;
private Throwable error;
AckPluginVerifier(BiConsumer<ServerConsumer, AckReason> assertion) {
this.assertion = assertion;
}
@Override
public void messageAcknowledged(MessageReference ref, AckReason reason) {
assertEquals(AckReason.EXPIRED, reason);
public void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) {
try {
assertion.accept(consumer, reason);
} catch (Throwable e) {
error = e;
throw e;
}
}
private boolean hasError() {
return error != null;
}
private String getErrorMsg() {
return hasError() ? error.getMessage() : "";
}
}

View File

@ -272,13 +272,13 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
}
@Override
public void messageExpired(MessageReference message, SimpleString messageExpiryAddress) {
public void messageExpired(MessageReference message, SimpleString messageExpiryAddress, ServerConsumer consumer) {
Preconditions.checkNotNull(message);
methodCalled(MESSAGE_EXPIRED);
}
@Override
public void messageAcknowledged(MessageReference ref, AckReason reason) {
public void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) {
Preconditions.checkNotNull(ref);
Preconditions.checkNotNull(reason);
methodCalled(MESSAGE_ACKED);

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ReferenceCounter;
@ -213,7 +214,13 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public void acknowledge(MessageReference ref, AckReason reason) throws Exception {
public void acknowledge(final MessageReference ref, ServerConsumer consumer) throws Exception {
// no-op
}
@Override
public void acknowledge(MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
// no-op
}
@ -225,7 +232,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception {
public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
// no-op
}
@ -310,6 +317,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public void expire(final MessageReference ref, final ServerConsumer consumer) throws Exception {
// no-op
}
@Override
public boolean expireReference(final long messageID) throws Exception {
// no-op