diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index 2068d116eb..94be0e86ec 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.BrokerConnection; import org.apache.activemq.artemis.core.server.Consumer; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.mirror.MirrorController; @@ -204,11 +205,11 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, public void createLink(Queue queue, AMQPBrokerConnectionElement connectionElement) { if (connectionElement.getType() == AMQPBrokerConnectionAddressType.PEER) { - connectSender(queue, queue.getAddress().toString(), Symbol.valueOf("qd.waypoint")); + connectSender(queue, queue.getAddress().toString(), null, Symbol.valueOf("qd.waypoint")); connectReceiver(protonRemotingConnection, session, sessionContext, queue, Symbol.valueOf("qd.waypoint")); } else { if (connectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) { - connectSender(queue, queue.getAddress().toString()); + connectSender(queue, queue.getAddress().toString(), null); } if (connectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) { connectReceiver(protonRemotingConnection, session, sessionContext, queue); @@ -278,7 +279,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, AMQPMirrorBrokerConnectionElement replica = (AMQPMirrorBrokerConnectionElement)connectionElement; Queue queue = server.locateQueue(replica.getSourceMirrorAddress()); - connectSender(queue, ProtonProtocolManager.MIRROR_ADDRESS); + connectSender(queue, ProtonProtocolManager.MIRROR_ADDRESS, (r) -> AMQPMirrorControllerSource.validateProtocolData(r, replica.getSourceMirrorAddress())); } } } @@ -457,6 +458,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, private void connectSender(Queue queue, String targetName, + java.util.function.Consumer beforeDeliver, Symbol... capabilities) { if (logger.isDebugEnabled()) { logger.debug("Connecting outbound for " + queue); @@ -491,7 +493,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, AMQPOutgoingController outgoingInitializer = new AMQPOutgoingController(queue, sender, sessionContext.getSessionSPI()); - ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer); + ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver); sessionContext.addSender(sender, senderContext); } catch (Exception e) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index 86dda6d37f..966a6075fd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; @@ -155,28 +156,42 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom MessageReference ref = MessageReference.Factory.createReference(message, snfQueue); snfQueue.refUp(ref); - - Map daMap = new HashMap<>(); - DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap); - daMap.put(INTERNAL_ID, message.getMessageID()); - String address = message.getAddress(); - if (address != null) { // this is the message that was set through routing - Properties amqpProperties = getProperties(message); - if (amqpProperties == null || !address.equals(amqpProperties.getTo())) { - // We set the internal destination property only if we need to - // otherwise we just use the one already set over Properties - daMap.put(INTERNAL_DESTINATION, message.getAddress()); - } - } - ref.setProtocolData(deliveryAnnotations); - refs.add(ref); message.usageUp(); + + setProtocolData(ref); + + if (message.isDurable() && snfQueue.isDurable()) { + PostOfficeImpl.storeDurableReference(server.getStorageManager(), message, context.getTransaction(), snfQueue, true); + } + } catch (Throwable e) { logger.warn(e.getMessage(), e); } } + public static void validateProtocolData(MessageReference ref, SimpleString snfAddress) { + if (ref.getProtocolData() == null && !ref.getMessage().getAddressSimpleString().equals(snfAddress)) { + setProtocolData(ref); + } + } + + private static void setProtocolData(MessageReference ref) { + Map daMap = new HashMap<>(); + DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap); + daMap.put(INTERNAL_ID, ref.getMessage().getMessageID()); + String address = ref.getMessage().getAddress(); + if (address != null) { // this is the message that was set through routing + Properties amqpProperties = getProperties(ref.getMessage()); + if (amqpProperties == null || !address.equals(amqpProperties.getTo())) { + // We set the internal destination property only if we need to + // otherwise we just use the one already set over Properties + daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress()); + } + } + ref.setProtocolData(deliveryAnnotations); + } + private static Properties getProperties(Message message) { if (message instanceof AMQPMessage) { return AMQPMessageBrokerAccessor.getCurrentProperties((AMQPMessage)message); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index ffcf5af29a..0e138d8072 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -135,6 +135,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr * */ private final Object creditsLock = new Object(); private final java.util.function.Consumer executeDelivery; + private java.util.function.Consumer beforeDelivery; private final boolean amqpTreatRejectAsUnmodifiedDeliveryFailed; public ProtonServerSenderContext(AMQPConnectionContext connection, @@ -160,6 +161,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr .isAmqpTreatRejectAsUnmodifiedDeliveryFailed(); } + public ProtonServerSenderContext setBeforeDelivery(java.util.function.Consumer beforeDelivery) { + this.beforeDelivery = beforeDelivery; + return this; + } + public Object getBrokerConsumer() { return brokerConsumer; } @@ -492,6 +498,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return 0; } + if (beforeDelivery != null) { + beforeDelivery.accept(messageReference); + } + try { synchronized (creditsLock) { if (sender.getLocalState() == EndpointState.CLOSED) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 365ce0120a..a4c2605278 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1519,7 +1519,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if (store != null && storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) { if (message.isLargeMessage()) { - confirmLargeMessageSend(tx, message); + confirmLargeMessageSend(storageManager, tx, message); } // We need to kick delivery so the Queues may check for the cursors case they are empty @@ -1595,24 +1595,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding refs.add(reference); queue.refUp(reference); if (message.isDurable()) { - final int durableRefCount = queue.durableUp(message); - if (durableRefCount == 1) { - if (tx != null) { - storageManager.storeMessageTransactional(tx.getID(), message); - } else { - storageManager.storeMessage(message); - } - if (message.isLargeMessage()) { - confirmLargeMessageSend(tx, message); - } - } - if (tx != null) { - storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID()); - tx.setContainsPersistent(); - } else { - final boolean last = i == (durableQueuesCount - 1); - storageManager.storeReference(queue.getID(), message.getMessageID(), last); - } + storeDurableReference(storageManager, message, tx, queue, durableQueuesCount == i); if (deliveryTime != null && deliveryTime > 0) { if (tx != null) { storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference); @@ -1624,12 +1607,36 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } + public static void storeDurableReference(StorageManager storageManager, Message message, + Transaction tx, + Queue queue, boolean sync) throws Exception { + assert message.isDurable(); + + final int durableRefCount = queue.durableUp(message); + if (durableRefCount == 1) { + if (tx != null) { + storageManager.storeMessageTransactional(tx.getID(), message); + } else { + storageManager.storeMessage(message); + } + if (message.isLargeMessage()) { + confirmLargeMessageSend(storageManager, tx, message); + } + } + if (tx != null) { + storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID()); + tx.setContainsPersistent(); + } else { + storageManager.storeReference(queue.getID(), message.getMessageID(), sync); + } + } + /** * @param tx * @param message * @throws Exception */ - private void confirmLargeMessageSend(Transaction tx, final Message message) throws Exception { + private static void confirmLargeMessageSend(StorageManager storageManager, Transaction tx, final Message message) throws Exception { LargeServerMessage largeServerMessage = (LargeServerMessage) message; synchronized (largeServerMessage) { if (largeServerMessage.getPendingRecordID() >= 0) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java index 80a08e77c7..8483d75aff 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java @@ -446,6 +446,78 @@ public class AMQPReplicaTest extends AmqpClientTestSupport { } } + @Test + public void testRouteSurviving() throws Exception { + testRouteSurvivor(false); + } + + @Test + public void testRouteSurvivingStop() throws Exception { + testRouteSurvivor(true); + } + + + private void testRouteSurvivor(boolean server1Stopped) throws Exception { + if (!server1Stopped) { + server.start(); + } + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("server_2"); + server_2.getConfiguration().setName("thisone"); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("OtherSide", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setSourceMirrorAddress("TheSource"); + amqpConnection.addElement(replica); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + server_2.start(); + + // We create the address to avoid auto delete on the queue + server_2.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + server_2.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false)); + + int NUMBER_OF_MESSAGES = 200; + + ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2); + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + producer.send(session.createTextMessage("i=" + i)); + } + + connection.close(); + + { + if (!server1Stopped) { + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + Queue queueServer1 = server.locateQueue(getQueueName()); + Wait.assertEquals(NUMBER_OF_MESSAGES, queueServer1::getMessageCount); + } + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Queue queueServer2 = server_2.locateQueue(getQueueName()); + Wait.assertEquals(NUMBER_OF_MESSAGES, queueServer2::getMessageCount); + } + + if (!server1Stopped) { + server.stop(); + } + server_2.stop(); + + server.start(); + server_2.start(); + + + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Queue queueServer1 = server.locateQueue(getQueueName()); + Queue queueServer2 = server_2.locateQueue(getQueueName()); + + Wait.assertEquals(NUMBER_OF_MESSAGES, queueServer1::getMessageCount); + Wait.assertEquals(NUMBER_OF_MESSAGES, queueServer2::getMessageCount); + } private void replicaTest(boolean largeMessage,