diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index c811b6356c..d2c886b068 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -521,6 +521,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled ref.getQueue().acknowledge(ref); pendingAcks.countDown(); metrics.incrementMessagesAcknowledged(); + + if (server.hasBrokerPlugins()) { + server.callBrokerPlugins(plugin -> plugin.afterAcknowledgeBridge(this, ref)); + } } else { if (logger.isTraceEnabled()) { logger.trace("BridgeImpl::sendAcknowledged bridge " + this + " could not find reference for message " + message); @@ -614,13 +618,17 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled pendingAcks.countUp(); try { + if (server.hasBrokerPlugins()) { + server.callBrokerPlugins(plugin -> plugin.beforeDeliverBridge(this, ref)); + } + final HandleStatus status; if (message.isLargeMessage()) { deliveringLargeMessage = true; deliverLargeMessage(dest, ref, (LargeServerMessage) message); status = HandleStatus.HANDLED; } else { - status = deliverStandardMessage(dest, ref, message); + status = deliverStandardMessage(dest, ref, message); } //Only increment messages pending acknowledgement if handled by bridge @@ -628,6 +636,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled metrics.incrementMessagesPendingAcknowledgement(); } + if (server.hasBrokerPlugins()) { + server.callBrokerPlugins(plugin -> plugin.afterDeliverBridge(this, ref, status)); + } + return status; } catch (Exception e) { // If an exception happened, we must count down immediately diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java index 97e23b48fe..704420e88b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; @@ -610,7 +611,6 @@ public interface ActiveMQServerPlugin { this.messageAcknowledged(ref, reason); } - /** * Before a bridge is deployed * @@ -631,6 +631,41 @@ public interface ActiveMQServerPlugin { } + /** + * Called immediately before a bridge delivers a message + * + * @param bridge + * @param ref + * @throws ActiveMQException + */ + default void beforeDeliverBridge(Bridge bridge, MessageReference ref) throws ActiveMQException { + + } + + /** + * Called immediately after a bridge delivers a message but before the message + * is acknowledged + * + * @param bridge + * @param ref + * @param status + * @throws ActiveMQException + */ + default void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException { + + } + + /** + * Called after delivered message over this bridge has been acknowledged by the remote broker + * + * @param bridge + * @param ref + * @throws ActiveMQException + */ + default void afterAcknowledgeBridge(Bridge bridge, MessageReference ref) throws ActiveMQException { + + } + /** * A Critical failure has been detected. * This will be called before the broker is stopped diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java index f83b3eef25..a236b8e3d8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.plugin; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ACKNOWLEDGE_BRIDGE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_BINDING; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; @@ -25,6 +26,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER_BRIDGE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_QUEUE; @@ -42,6 +44,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER_BRIDGE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DESTROY_QUEUE; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE; @@ -70,6 +73,13 @@ import javax.jms.Topic; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; @@ -78,10 +88,12 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -285,6 +297,7 @@ public class CorePluginTest extends JMSTestBase { Map server0Params = new HashMap<>(); server0 = createClusteredServerWithParams(false, 0, false, server0Params); + server0.registerBrokerPlugin(verifier); Map server1Params = new HashMap<>(); server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1); @@ -293,23 +306,21 @@ public class CorePluginTest extends JMSTestBase { final String testAddress = "testAddress"; final String queueName0 = "queue0"; final String forwardAddress = "forwardAddress"; + final String queueName1 = "queue1"; + TransportConfiguration server0tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server0Params); TransportConfiguration server1tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params); HashMap connectors = new HashMap<>(); connectors.put(server1tc.getName(), server1tc); server0.getConfiguration().setConnectorConfigurations(connectors); - server0.registerBrokerPlugin(verifier); + + final int messageSize = 1024; + final int numMessages = 10; ArrayList connectorConfig = new ArrayList<>(); connectorConfig.add(server1tc.getName()); - BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1") - .setQueueName(queueName0) - .setForwardingAddress(forwardAddress) - .setRetryInterval(1000) - .setReconnectAttemptsOnSameNode(-1) - .setUseDuplicateDetection(false) - .setStaticConnectors(connectorConfig); + BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig); List bridgeConfigs = new ArrayList<>(); bridgeConfigs.add(bridgeConfiguration); @@ -320,14 +331,59 @@ public class CorePluginTest extends JMSTestBase { queueConfigs0.add(queueConfig0); server0.getConfiguration().setQueueConfigurations(queueConfigs0); + CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1); + List queueConfigs1 = new ArrayList<>(); + queueConfigs1.add(queueConfig1); + server1.getConfiguration().setQueueConfigurations(queueConfigs1); + server1.start(); server0.start(); + ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc)); + ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc)); + ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc)); + + ClientSession session0 = sf0.createSession(false, true, true); + ClientSession session1 = sf1.createSession(false, true, true); + ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress)); + ClientConsumer consumer1 = session1.createConsumer(queueName1); + session1.start(); + + final byte[] bytes = new byte[messageSize]; + final SimpleString propKey = new SimpleString("testkey"); + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = session0.createMessage(true); + message.putIntProperty(propKey, i); + message.getBodyBuffer().writeBytes(bytes); + producer0.send(message); + } + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = consumer1.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals(i, message.getObjectProperty(propKey)); + message.acknowledge(); + } + + Assert.assertNull(consumer1.receiveImmediate()); + session0.close(); + session1.close(); + + sf0.close(); + sf1.close(); + + assertEquals(1, server0.getClusterManager().getBridges().size()); + BridgeMetrics bridgeMetrics = server0.getClusterManager().getBridges().get("bridge1").getMetrics(); + assertEquals(10, bridgeMetrics.getMessagesPendingAcknowledgement()); + assertEquals(10, bridgeMetrics.getMessagesAcknowledged()); + + //verify plugin method calls verifier.validatePluginMethodsEquals(1, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE); + verifier.validatePluginMethodsEquals(10, BEFORE_DELIVER_BRIDGE, AFTER_DELIVER_BRIDGE, AFTER_ACKNOWLEDGE_BRIDGE); server0.stop(); server1.stop(); - } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java index 8977ba5f06..9c24505946 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.security.SecurityAuth; +import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; @@ -95,6 +96,9 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin { public static final String AFTER_DELIVER = "afterDeliver"; public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge"; public static final String AFTER_DEPLOY_BRIDGE = "afterDeployBridge"; + public static final String BEFORE_DELIVER_BRIDGE = "beforeDeliverBridge"; + public static final String AFTER_DELIVER_BRIDGE = "afterDeliverBridge"; + public static final String AFTER_ACKNOWLEDGE_BRIDGE = "afterAcknowledgeBridge"; public MethodCalledVerifier(Map methodCalls) { super(); @@ -340,6 +344,24 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin { methodCalled(AFTER_DEPLOY_BRIDGE); } + @Override + public void beforeDeliverBridge(Bridge bridge, MessageReference ref) throws ActiveMQException { + Preconditions.checkNotNull(bridge); + methodCalled(BEFORE_DELIVER_BRIDGE); + } + + @Override + public void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException { + Preconditions.checkNotNull(bridge); + methodCalled(AFTER_DELIVER_BRIDGE); + } + + @Override + public void afterAcknowledgeBridge(Bridge bridge, MessageReference ref) throws ActiveMQException { + Preconditions.checkNotNull(bridge); + methodCalled(AFTER_ACKNOWLEDGE_BRIDGE); + } + public void validatePluginMethodsEquals(int count, String... names) { Arrays.asList(names).forEach(name -> { try {