diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java index bab9ca9b7b..f523664b4a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java @@ -17,12 +17,15 @@ package org.apache.activemq.artemis.protocol.amqp.broker; +import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ACK_REASON; + /** Warning: do not use this class outside of the broker implementation. * This is exposing package methods on this package that are not meant to be used on user's application. */ public class AMQPMessageBrokerAccessor { @@ -37,6 +40,11 @@ public class AMQPMessageBrokerAccessor { return message.getMessageAnnotation(symbol); } + public static AckReason getMessageAnnotationAckReason(AMQPMessage message) { + Number reasonVal = (Number) getMessageAnnotationProperty(message, ACK_REASON); + return reasonVal == null ? AckReason.NORMAL : AckReason.fromValue(reasonVal.byteValue()); + } + /** Warning: this is a method specific to the broker. Do not use it on user's application. */ public static Header getCurrentHeader(AMQPMessage message) { return message.getCurrentHeader(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index a70773634f..77303074be 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -34,7 +34,6 @@ import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; import org.apache.activemq.artemis.core.server.mirror.MirrorController; -import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor; import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; @@ -52,6 +51,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im private static final Logger logger = Logger.getLogger(AMQPMirrorControllerSource.class); public static final Symbol EVENT_TYPE = Symbol.getSymbol("x-opt-amq-mr-ev-type"); + public static final Symbol ACK_REASON = Symbol.getSymbol("x-opt-amq-mr-ack-reason"); public static final Symbol ADDRESS = Symbol.getSymbol("x-opt-amq-mr-adr"); public static final Symbol QUEUE = Symbol.getSymbol("x-opt-amq-mr-qu"); public static final Symbol BROKER_ID = Symbol.getSymbol("x-opt-amq-bkr-id"); @@ -74,7 +74,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(INTERNAL_ID.toString()); public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(BROKER_ID.toString()); - private static final ThreadLocal mirrorControlRouting = ThreadLocal.withInitial(() -> new MirrorControlRouting(null)); + private static final ThreadLocal mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null).setMirrorDisabled(true)); final Queue snfQueue; final ActiveMQServer server; @@ -350,32 +350,24 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im if (logger.isTraceEnabled()) { logger.trace(server + " sending ack message from server " + nodeID + " with messageID=" + internalID); } - Message message = createMessage(ref.getQueue().getAddress(), ref.getQueue().getName(), POST_ACK, nodeID, internalID); + Message message = createMessage(ref.getQueue().getAddress(), ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason); route(server, message); ref.getMessage().usageDown(); } private Message createMessage(SimpleString address, SimpleString queue, Object event, String brokerID, Object body) { - return AMQPMirrorMessageFactory.createMessage(snfQueue.getAddress().toString(), address, queue, event, brokerID, body); + return AMQPMirrorMessageFactory.createMessage(snfQueue.getAddress().toString(), address, queue, event, brokerID, body, null); + } + + private Message createMessage(SimpleString address, SimpleString queue, Object event, String brokerID, Object body, AckReason ackReason) { + return AMQPMirrorMessageFactory.createMessage(snfQueue.getAddress().toString(), address, queue, event, brokerID, body, ackReason); } public static void route(ActiveMQServer server, Message message) throws Exception { message.setMessageID(server.getStorageManager().generateID()); - MirrorControlRouting ctx = mirrorControlRouting.get(); + RoutingContext ctx = mirrorControlRouting.get(); ctx.clear(); server.getPostOffice().route(message, ctx, false); } - private static class MirrorControlRouting extends RoutingContextImpl { - - MirrorControlRouting(Transaction transaction) { - super(transaction); - } - - @Override - public boolean isMirrorController() { - return true; - } - } - } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index 5c664bc7e3..4be5968123 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -226,6 +226,9 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement } else if (eventType.equals(POST_ACK)) { String address = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, ADDRESS); String nodeID = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, BROKER_ID); + + AckReason ackReason = AMQPMessageBrokerAccessor.getMessageAnnotationAckReason(message); + if (nodeID == null) { nodeID = getRemoteMirrorId(); // not sending the nodeID means it's data generated on that broker } @@ -235,7 +238,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement if (logger.isDebugEnabled()) { logger.debug(server + " Post ack address=" + address + " queueName = " + queueName + " messageID=" + messageID + ", nodeID=" + nodeID); } - if (postAcknowledge(address, queueName, nodeID, messageID, messageAckOperation)) { + if (postAcknowledge(address, queueName, nodeID, messageID, messageAckOperation, ackReason)) { messageAckOperation = null; } } @@ -333,7 +336,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement } } - public boolean postAcknowledge(String address, String queue, String nodeID, long messageID, ACKMessageOperation ackMessage) throws Exception { + public boolean postAcknowledge(String address, String queue, String nodeID, long messageID, ACKMessageOperation ackMessage, AckReason reason) throws Exception { final Queue targetQueue = server.locateQueue(queue); if (targetQueue == null) { @@ -352,9 +355,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement logger.trace("Server " + server.getIdentity() + " with queue = " + queue + " being acked for " + messageID + " coming from " + messageID + " targetQueue = " + targetQueue); } - performAck(nodeID, messageID, targetQueue, ackMessage, true); + performAck(nodeID, messageID, targetQueue, ackMessage, reason, true); return true; - } @@ -363,18 +365,19 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement targetQueue.getPageSubscription().scanAck(pageAck, pageAck, pageAck, pageAck); } - private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, boolean retry) { + private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, AckReason reason, boolean retry) { if (logger.isTraceEnabled()) { logger.trace("performAck (nodeID=" + nodeID + ", messageID=" + messageID + ")" + ", targetQueue=" + targetQueue.getName()); } MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore); + if (reference == null && retry) { if (logger.isDebugEnabled()) { logger.debug("Retrying Reference not found on messageID=" + messageID + " nodeID=" + nodeID); } targetQueue.flushOnIntermediate(() -> { recoverContext(); - performAck(nodeID, messageID, targetQueue, ackMessageOperation, false); + performAck(nodeID, messageID, targetQueue, ackMessageOperation, reason, false); }); return; } @@ -383,13 +386,24 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement logger.trace("Post ack Server " + server + " worked well for messageID=" + messageID + " nodeID=" + nodeID); } try { - targetQueue.acknowledge(reference); + switch (reason) { + case EXPIRED: + targetQueue.expire(reference, null, false); + break; + default: + targetQueue.acknowledge(null, reference, reason, null, false); + break; + } OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation); } catch (Exception e) { logger.warn(e.getMessage(), e); } } else { - performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation); + if (reason != AckReason.EXPIRED) { + // if expired, we don't need to check on paging + // as the message will expire again when depaged (if on paging) + performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation); + } } } @@ -508,7 +522,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement if (reference == null) { return false; } else { - targetQueue.acknowledge(reference); + targetQueue.acknowledge(null, reference, AckReason.NORMAL, null, false); OperationContextImpl.getContext().executeOnCompletion(operation); return true; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorMessageFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorMessageFactory.java index 9b61df9cf1..72ffabef82 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorMessageFactory.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorMessageFactory.java @@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; @@ -35,6 +36,7 @@ import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.codec.EncoderImpl; import org.apache.qpid.proton.codec.WritableBuffer; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ACK_REASON; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.BROKER_ID; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE; @@ -49,12 +51,19 @@ public class AMQPMirrorMessageFactory { * This method is open to make it testable, * do not use on your applications. */ - public static Message createMessage(String to, SimpleString address, SimpleString queue, Object event, String brokerID, Object body) { + public static Message createMessage(String to, SimpleString address, SimpleString queue, Object event, String brokerID, Object body, AckReason ackReason) { Header header = new Header(); header.setDurable(true); Map annotations = new HashMap<>(); annotations.put(EVENT_TYPE, event); + + if (ackReason != null && ackReason != AckReason.NORMAL) { + // if the ack reason is normal, we just send it null as the target will assume it as normal + // just to save some bits + annotations.put(ACK_REASON, ackReason.getVal()); + } + if (address != null) { annotations.put(ADDRESS, address.toString()); } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index ee5d0cbce1..e061712a0b 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -327,7 +327,7 @@ public class AMQConsumer { if (ack.isExpiredAck()) { for (MessageReference ref : ackList) { - ref.getQueue().expire(ref, serverConsumer); + ref.getQueue().expire(ref, serverConsumer, true); } } else if (removeReferences) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 5bf4ae2cbb..40de11a123 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -282,11 +282,7 @@ public class PagedReferenceImpl extends LinkedListImpl.Node @Override public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception { - if (tx == null) { - getQueue().acknowledge(this, reason, consumer); - } else { - getQueue().acknowledge(tx, this, reason, consumer); - } + getQueue().acknowledge(tx, this, reason, consumer, true); } /* (non-Javadoc) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 744f41f2b8..6f37ea3fa3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -245,7 +245,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding @Override public void postAcknowledge(MessageReference ref, AckReason reason) { - if (mirrorControllerSource != null) { + if (mirrorControllerSource != null && reason != AckReason.REPLACED) { // we don't send replacements on LVQ as they are replaced themselves on the target try { mirrorControllerSource.postAcknowledge(ref, reason); } catch (Exception e) { @@ -1573,8 +1573,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } - if (mirrorControllerSource != null && !context.isMirrorController()) { - // we check for isMirrorController as to avoid recursive loop from there + if (mirrorControllerSource != null && !context.isMirrorDisabled()) { + // we check for isMirrorDisabled as to avoid recursive loop from there mirrorControllerSource.sendMessage(message, context, refs); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index fc09776228..67e2b0b31c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -220,7 +220,7 @@ public interface Queue extends Bindable,CriticalComponent { void acknowledge(Transaction tx, MessageReference ref) throws Exception; - void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception; + void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer, boolean delivering) throws Exception; void reacknowledge(Transaction tx, MessageReference ref) throws Exception; @@ -341,7 +341,7 @@ public interface Queue extends Bindable,CriticalComponent { void expire(MessageReference ref) throws Exception; - void expire(MessageReference ref, ServerConsumer consumer) throws Exception; + void expire(MessageReference ref, ServerConsumer consumer, boolean delivering) throws Exception; boolean sendMessageToDeadLetterAddress(long messageID) throws Exception; @@ -506,6 +506,8 @@ public interface Queue extends Bindable,CriticalComponent { void postAcknowledge(MessageReference ref, AckReason reason); + void postAcknowledge(MessageReference ref, AckReason reason, boolean delivering); + /** * @return the user associated with this queue */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java index fa781fdb86..db9c360b38 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java @@ -40,7 +40,9 @@ public interface RoutingContext { /** If the routing is from MirrorController, we don't redo mirrorController * to avoid*/ - boolean isMirrorController(); + boolean isMirrorDisabled(); + + RoutingContext setMirrorDisabled(boolean mirrorDisabled); /** return true if every queue routed is internal */ boolean isInternal(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java index 164f141e3d..0c2ef36ea4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java @@ -18,5 +18,33 @@ package org.apache.activemq.artemis.core.server.impl; public enum AckReason { - KILLED, EXPIRED, NORMAL, REPLACED + NORMAL((byte)0), KILLED((byte)1), EXPIRED((byte)2), REPLACED((byte)3); + + private byte value; + + AckReason(byte value) { + this.value = value; + } + + public byte getVal() { + return value; + } + + public static AckReason fromValue(byte value) { + switch (value) { + case 0: + return NORMAL; + case 1: + return KILLED; + case 2: + return EXPIRED; + case 3: + return REPLACED; + default: + // in case a newer version connects with a not known type + // this will just play safe and use the NORMAL ack mode + return NORMAL; + } + } + } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 1ea280c5a3..f01403fb90 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -281,7 +281,7 @@ public class LastValueQueue extends QueueImpl { @Override public QueueConfiguration getQueueConfiguration() { - return super.getQueueConfiguration().setLastValue(true); + return super.getQueueConfiguration().setLastValue(true).setLastValueKey(lastValueKey); } private void replaceLVQMessage(MessageReference ref, HolderReference hr) { @@ -319,11 +319,11 @@ public class LastValueQueue extends QueueImpl { public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, - ServerConsumer consumer) throws Exception { + ServerConsumer consumer, boolean delivering) throws Exception { if (reason == AckReason.EXPIRED || reason == AckReason.KILLED) { removeIfCurrent(ref); } - super.acknowledge(tx, ref, reason, consumer); + super.acknowledge(tx, ref, reason, consumer, delivering); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index ca1218069e..262689aa53 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -275,7 +275,7 @@ public class MessageReferenceImpl extends LinkedListImpl.Node server.locateQueue(queueName) != null); + org.apache.activemq.artemis.core.server.Queue queue = server.locateQueue(queueName); + Assert.assertNotNull(queue); + return queue; + } + + @Test + public void testExpiryNoReaper() throws Exception { + internalExpiry(false); + } + + @Test + public void testExpiry() throws Exception { + internalExpiry(true); + } + + private void internalExpiry(boolean useReaper) throws Exception { + server.setIdentity("Server1"); + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + server.getConfiguration().addAddressSetting("#", new AddressSettings().setExpiryAddress(SimpleString.toSimpleString("expiryQueue"))); + + server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName("expiryQueue")); + server.getConfiguration().addQueueConfiguration(new QueueConfiguration("expiryQueue").setRoutingType(RoutingType.ANYCAST)); + if (!useReaper) { + server.getConfiguration().setMessageExpiryScanPeriod(-1); + } + + server.start(); + + server_2 = createServer(AMQP_PORT_2, false); + + server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName("expiryQueue")); + server.getConfiguration().addQueueConfiguration(new QueueConfiguration("expiryQueue").setRoutingType(RoutingType.ANYCAST)); + if (!useReaper) { + server_2.getConfiguration().setMessageExpiryScanPeriod(-1); + } + + server_2.setIdentity("Server2"); + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + server_2.getConfiguration().addAddressSetting("#", new AddressSettings().setExpiryAddress(SimpleString.toSimpleString("expiryQueue"))); + + server_2.start(); + + org.apache.activemq.artemis.core.server.Queue to1 = locateQueueWithWait(server_2, "$ACTIVEMQ_ARTEMIS_MIRROR_to_1"); + org.apache.activemq.artemis.core.server.Queue to2 = locateQueueWithWait(server, "$ACTIVEMQ_ARTEMIS_MIRROR_to_2"); + org.apache.activemq.artemis.core.server.Queue expiry1 = locateQueueWithWait(server, "expiryQueue"); + org.apache.activemq.artemis.core.server.Queue expiry2 = locateQueueWithWait(server_2, "expiryQueue"); + + server_2.addAddressInfo(new AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST)); + server_2.createQueue(new QueueConfiguration(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + + org.apache.activemq.artemis.core.server.Queue queueOnServer1 = locateQueueWithWait(server, getQueueName()); + org.apache.activemq.artemis.core.server.Queue queueOnServer2 = locateQueueWithWait(server_2, getQueueName()); + + ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + Connection connection1 = cf1.createConnection(); + Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED); + connection1.start(); + + Queue queue = session1.createQueue(getQueueName()); + + MessageProducer producerServer1 = session1.createProducer(queue); + + producerServer1.setTimeToLive(1000); + + TextMessage message = session1.createTextMessage("test"); + message.setIntProperty("i", 0); + message.setStringProperty("server", server.getIdentity()); + producerServer1.send(message); + session1.commit(); + + Wait.assertEquals(1L, queueOnServer1::getMessageCount, 1000, 100); + Wait.assertEquals(1L, queueOnServer2::getMessageCount, 1000, 100); + + if (useReaper) { + // if using the reaper, I want to test what would happen with races between the reaper and the Mirror target + // for that reason we only pause the SNFs if we are using the reaper + to1.pause(); + to2.pause(); + } + + Thread.sleep(1500); + if (!useReaper) { + queueOnServer1.expireReferences(); // we will expire in just on queue hoping the other gets through mirror + } + + Wait.assertEquals(0L, queueOnServer1::getMessageCount, 2000, 100); + Wait.assertEquals(0L, queueOnServer2::getMessageCount, 2000, 100); + + Wait.assertEquals(1L, expiry1::getMessageCount, 1000, 100); + Wait.assertEquals(1L, expiry2::getMessageCount, 1000, 100); + + to1.resume(); + to2.resume(); + + if (!useReaper) { + queueOnServer1.expireReferences(); // in just one queue + } + + Wait.assertEquals(1L, expiry1::getMessageCount, 1000, 100); + Wait.assertEquals(1L, expiry2::getMessageCount, 1000, 100); + + connection1.close(); + + server_2.stop(); + server.stop(); + } + + @Test + public void testDLA() throws Exception { + server.setIdentity("Server1"); + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + server.getConfiguration().addAddressSetting("#", new AddressSettings().setDeadLetterAddress(SimpleString.toSimpleString("deadLetterQueue")).setMaxDeliveryAttempts(2)); + + server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName("deadLetterQueue")); + server.getConfiguration().addQueueConfiguration(new QueueConfiguration("deadLetterQueue").setRoutingType(RoutingType.ANYCAST)); + server.getConfiguration().setMessageExpiryScanPeriod(-1); + + server.start(); + + server_2 = createServer(AMQP_PORT_2, false); + + server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName("deadLetterQueue")); + server.getConfiguration().addQueueConfiguration(new QueueConfiguration("deadLetterQueue").setRoutingType(RoutingType.ANYCAST)); + server_2.getConfiguration().setMessageExpiryScanPeriod(-1); + + server_2.setIdentity("Server2"); + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + server_2.getConfiguration().addAddressSetting("#", new AddressSettings().setDeadLetterAddress(SimpleString.toSimpleString("deadLetterQueue")).setMaxDeliveryAttempts(2)); + + server_2.start(); + + org.apache.activemq.artemis.core.server.Queue to1 = locateQueueWithWait(server_2, "$ACTIVEMQ_ARTEMIS_MIRROR_to_1"); + org.apache.activemq.artemis.core.server.Queue to2 = locateQueueWithWait(server, "$ACTIVEMQ_ARTEMIS_MIRROR_to_2"); + org.apache.activemq.artemis.core.server.Queue dlq1 = locateQueueWithWait(server, "deadLetterQueue"); + org.apache.activemq.artemis.core.server.Queue dlq2 = locateQueueWithWait(server_2, "deadLetterQueue"); + + server_2.addAddressInfo(new AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST)); + server_2.createQueue(new QueueConfiguration(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + + org.apache.activemq.artemis.core.server.Queue queueOnServer1 = locateQueueWithWait(server, getQueueName()); + org.apache.activemq.artemis.core.server.Queue queueOnServer2 = locateQueueWithWait(server_2, getQueueName()); + + ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + Connection connection1 = cf1.createConnection(); + Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED); + connection1.start(); + + Queue queue = session1.createQueue(getQueueName()); + + MessageProducer producerServer1 = session1.createProducer(queue); + + TextMessage message = session1.createTextMessage("test"); + message.setIntProperty("i", 0); + message.setStringProperty("server", server.getIdentity()); + producerServer1.send(message); + session1.commit(); + + + MessageConsumer consumer1 = session1.createConsumer(queue); + connection1.start(); + + for (int i = 0; i < 2; i++) { + TextMessage messageToCancel = (TextMessage) consumer1.receive(1000); + Assert.assertNotNull(messageToCancel); + session1.rollback(); + } + + Assert.assertNull(consumer1.receiveNoWait()); + + Wait.assertEquals(0L, queueOnServer1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, queueOnServer2::getMessageCount, 1000, 100); + + Wait.assertEquals(1L, dlq1::getMessageCount, 1000, 100); + Wait.assertEquals(1L, dlq2::getMessageCount, 1000, 100); + + + dlq1.retryMessages(new Filter() { + @Override + public boolean match(Message message) { + return true; + } + + @Override + public boolean match(Map map) { + return true; + } + + @Override + public boolean match(Filterable filterable) { + return true; + } + + @Override + public SimpleString getFilterString() { + return SimpleString.toSimpleString("Test"); + } + }); + + Wait.assertEquals(1L, queueOnServer1::getMessageCount, 1000, 100); + Wait.assertEquals(1L, queueOnServer2::getMessageCount, 1000, 100); + + Wait.assertEquals(0L, dlq1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, dlq2::getMessageCount, 1000, 100); + + + connection1.close(); + + server_2.stop(); + server.stop(); + } + + + @Test + public void testLVQ() throws Exception { + AssertionLoggerHandler.startCapture(); + runAfter(AssertionLoggerHandler::stopCapture); + + server.setIdentity("Server1"); + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + server.getConfiguration().addAddressSetting("#", new AddressSettings().setDeadLetterAddress(SimpleString.toSimpleString("deadLetterQueue")).setMaxDeliveryAttempts(2)); + + server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName("deadLetterQueue")); + server.getConfiguration().addQueueConfiguration(new QueueConfiguration("deadLetterQueue").setRoutingType(RoutingType.ANYCAST)); + + String lvqName = "testLVQ_" + RandomUtil.randomString(); + + server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(lvqName)); + server.getConfiguration().addQueueConfiguration(new QueueConfiguration(lvqName).setRoutingType(RoutingType.ANYCAST).setLastValue(true).setLastValueKey("KEY")); + server.getConfiguration().setMessageExpiryScanPeriod(-1); + + server.start(); + + server_2 = createServer(AMQP_PORT_2, false); + + server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName("deadLetterQueue")); + server.getConfiguration().addQueueConfiguration(new QueueConfiguration("deadLetterQueue").setRoutingType(RoutingType.ANYCAST)); + server_2.getConfiguration().setMessageExpiryScanPeriod(-1); + + server_2.setIdentity("Server2"); + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + server_2.getConfiguration().addAddressSetting("#", new AddressSettings().setDeadLetterAddress(SimpleString.toSimpleString("deadLetterQueue")).setMaxDeliveryAttempts(2)); + server_2.start(); + + org.apache.activemq.artemis.core.server.Queue lvqQueue1 = locateQueueWithWait(server, lvqName); + org.apache.activemq.artemis.core.server.Queue lvqQueue2 = locateQueueWithWait(server, lvqName); + + Assert.assertTrue(lvqQueue1.isLastValue()); + Assert.assertTrue(lvqQueue2.isLastValue()); + Assert.assertTrue(lvqQueue1 instanceof LastValueQueue); + Assert.assertTrue(lvqQueue2 instanceof LastValueQueue); + Assert.assertEquals("KEY", lvqQueue1.getQueueConfiguration().getLastValueKey().toString()); + Assert.assertEquals("KEY", lvqQueue2.getQueueConfiguration().getLastValueKey().toString()); + + ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + Connection connection1 = cf1.createConnection(); + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection1.start(); + + Queue queue = session1.createQueue(lvqName); + + MessageProducer producerServer1 = session1.createProducer(queue); + + for (int i = 0; i < 1000; i++) { + TextMessage message = session1.createTextMessage("test"); + message.setIntProperty("i", 0); + message.setStringProperty("KEY", "" + (i % 10)); + producerServer1.send(message); + } + + Wait.assertEquals(10L, lvqQueue1::getMessageCount, 2000, 100); + Wait.assertEquals(10L, lvqQueue2::getMessageCount, 2000, 100); + + connection1.close(); + + server_2.stop(); + server.stop(); + + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222153")); + } + + @Test public void testSyncData() throws Exception { int NUMBER_OF_MESSAGES = 100; @@ -324,6 +647,105 @@ public class BrokerInSyncTest extends AmqpClientTestSupport { System.out.println("Queue on Server 1 = " + queueOnServer1.getMessageCount()); System.out.println("Queue on Server 2 = " + queueOnServer2.getMessageCount()); + Assert.assertEquals(0, queueOnServer1.getDeliveringCount()); + Assert.assertEquals(0, queueOnServer2.getDeliveringCount()); + Assert.assertEquals(0, queueOnServer1.getDurableDeliveringCount()); + Assert.assertEquals(0, queueOnServer2.getDurableDeliveringCount()); + Assert.assertEquals(0, queueOnServer1.getDurableDeliveringSize()); + Assert.assertEquals(0, queueOnServer2.getDurableDeliveringSize()); + Assert.assertEquals(0, queueOnServer1.getDeliveringSize()); + Assert.assertEquals(0, queueOnServer2.getDeliveringSize()); + + server_2.stop(); + server.stop(); + } + + + + @Test + public void testStats() throws Exception { + int NUMBER_OF_MESSAGES = 1; + server.setIdentity("Server1"); + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + server.start(); + + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("Server2"); + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + server_2.start(); + + server_2.addAddressInfo(new AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST)); + server_2.createQueue(new QueueConfiguration(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + + ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + Connection connection1 = cf1.createConnection(); + Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED); + connection1.start(); + + ConnectionFactory cf2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2); + Connection connection2 = cf2.createConnection(); + Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED); + connection2.start(); + + Queue queue = session1.createQueue(getQueueName()); + + MessageProducer producerServer1 = session1.createProducer(queue); + MessageProducer producerServer2 = session2.createProducer(queue); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + TextMessage message = session1.createTextMessage("test " + i); + message.setIntProperty("i", i); + message.setStringProperty("server", server.getIdentity()); + producerServer1.send(message); + } + session1.commit(); + + org.apache.activemq.artemis.core.server.Queue queueOnServer1 = server.locateQueue(getQueueName()); + org.apache.activemq.artemis.core.server.Queue queueOnServer2 = server_2.locateQueue(getQueueName()); + Assert.assertNotNull(queueOnServer1); + Assert.assertNotNull(queueOnServer2); + + Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer1::getMessageCount); + Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer2::getMessageCount); + + Assert.assertEquals(0, queueOnServer1.getDeliveringSize()); + Assert.assertEquals(0, queueOnServer2.getDeliveringSize()); + + MessageConsumer consumerOn1 = session1.createConsumer(queue); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + TextMessage message = (TextMessage) consumerOn1.receive(5000); + logger.debug("### Client acking message(" + i + ") on server 1, a message that was original sent on " + message.getStringProperty("server") + " text = " + message.getText()); + Assert.assertNotNull(message); + Assert.assertEquals(i, message.getIntProperty("i")); + Assert.assertEquals("test " + i, message.getText()); + session1.commit(); + int fi = i; + } + + Wait.assertEquals(0L, queueOnServer1::getMessageCount, 2000, 100); + Wait.assertEquals(0L, queueOnServer2::getMessageCount, 2000, 100); + Assert.assertEquals(0, queueOnServer1.getDeliveringCount()); + Assert.assertEquals(0, queueOnServer2.getDeliveringCount()); + Assert.assertEquals(0, queueOnServer1.getDurableDeliveringCount()); + Assert.assertEquals(0, queueOnServer2.getDurableDeliveringCount()); + Assert.assertEquals(0, queueOnServer1.getDurableDeliveringSize()); + Assert.assertEquals(0, queueOnServer2.getDurableDeliveringSize()); + Assert.assertEquals(0, queueOnServer1.getDeliveringSize()); + Assert.assertEquals(0, queueOnServer2.getDeliveringSize()); + server_2.stop(); server.stop(); } @@ -500,7 +922,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport { try (Connection connection = factory1.createConnection()) { org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName); - Wait.assertEquals(0, serverQueue::getMessageCount); + Wait.assertEquals(0L, serverQueue::getMessageCount, 2000, 100); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/MirrorControllerBasicTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/MirrorControllerBasicTest.java index 5c23c5e04d..cd72dca36a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/MirrorControllerBasicTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/MirrorControllerBasicTest.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource; import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorMessageFactory; @@ -92,7 +93,7 @@ public class MirrorControllerBasicTest extends ActiveMQTestBase { server.addAddressInfo(new AddressInfo("test").addRoutingType(RoutingType.ANYCAST)); server.createQueue(new QueueConfiguration("test").setAddress("test").setRoutingType(RoutingType.ANYCAST)); - Message message = AMQPMirrorMessageFactory.createMessage("test", SimpleString.toSimpleString("ad1"), SimpleString.toSimpleString("qu1"), "test", "someUID", "body-test"); + Message message = AMQPMirrorMessageFactory.createMessage("test", SimpleString.toSimpleString("ad1"), SimpleString.toSimpleString("qu1"), "test", "someUID", "body-test", AckReason.KILLED); AMQPMirrorControllerSource.route(server, message); AmqpClient client = new AmqpClient(new URI("tcp://localhost:61616"), null, null); @@ -108,6 +109,9 @@ public class MirrorControllerBasicTest extends ActiveMQTestBase { Assert.assertEquals("qu1", amqpMessage.getMessageAnnotation(AMQPMirrorControllerSource.QUEUE.toString())); Assert.assertEquals("someUID", amqpMessage.getMessageAnnotation(AMQPMirrorControllerSource.BROKER_ID.toString())); Assert.assertEquals("test", amqpMessage.getMessageAnnotation(AMQPMirrorControllerSource.EVENT_TYPE.toString())); + Number ackReason = (Number)amqpMessage.getMessageAnnotation("x-opt-amq-mr-ack-reason"); + Assert.assertEquals(AckReason.KILLED.getVal(), ackReason.byteValue()); + Assert.assertEquals(AckReason.KILLED, AckReason.fromValue(ackReason.byteValue())); connection.close(); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 5552080a6e..10299cb05f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -396,7 +396,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override - public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception { + public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer, boolean decDel) throws Exception { // no-op } @@ -492,7 +492,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override - public void expire(final MessageReference ref, final ServerConsumer consumer) throws Exception { + public void expire(final MessageReference ref, final ServerConsumer consumer, boolean decDel) throws Exception { // no-op } @@ -966,6 +966,10 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { return null; } + @Override + public void postAcknowledge(MessageReference ref, AckReason reason, boolean delivering) { + } + @Override public void postAcknowledge(MessageReference ref, AckReason reason) { }