ARTEMIS-2018 - Add bridge events to plugin API

Add callbacks to handle bridge events including beforeDeliverBridge,
afterDeliverBridge and afterAcknowledgeBridge
This commit is contained in:
Christopher L. Shannon (cshannon) 2018-08-08 14:10:22 -04:00 committed by Michael Andre Pearce
parent 24c13fa4e0
commit e915545278
4 changed files with 136 additions and 11 deletions

View File

@ -521,6 +521,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
ref.getQueue().acknowledge(ref);
pendingAcks.countDown();
metrics.incrementMessagesAcknowledged();
if (server.hasBrokerPlugins()) {
server.callBrokerPlugins(plugin -> plugin.afterAcknowledgeBridge(this, ref));
}
} else {
if (logger.isTraceEnabled()) {
logger.trace("BridgeImpl::sendAcknowledged bridge " + this + " could not find reference for message " + message);
@ -614,13 +618,17 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
pendingAcks.countUp();
try {
if (server.hasBrokerPlugins()) {
server.callBrokerPlugins(plugin -> plugin.beforeDeliverBridge(this, ref));
}
final HandleStatus status;
if (message.isLargeMessage()) {
deliveringLargeMessage = true;
deliverLargeMessage(dest, ref, (LargeServerMessage) message);
status = HandleStatus.HANDLED;
} else {
status = deliverStandardMessage(dest, ref, message);
status = deliverStandardMessage(dest, ref, message);
}
//Only increment messages pending acknowledgement if handled by bridge
@ -628,6 +636,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
metrics.incrementMessagesPendingAcknowledgement();
}
if (server.hasBrokerPlugins()) {
server.callBrokerPlugins(plugin -> plugin.afterDeliverBridge(this, ref, status));
}
return status;
} catch (Exception e) {
// If an exception happened, we must count down immediately

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
@ -610,7 +611,6 @@ public interface ActiveMQServerPlugin {
this.messageAcknowledged(ref, reason);
}
/**
* Before a bridge is deployed
*
@ -631,6 +631,41 @@ public interface ActiveMQServerPlugin {
}
/**
* Called immediately before a bridge delivers a message
*
* @param bridge
* @param ref
* @throws ActiveMQException
*/
default void beforeDeliverBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
}
/**
* Called immediately after a bridge delivers a message but before the message
* is acknowledged
*
* @param bridge
* @param ref
* @param status
* @throws ActiveMQException
*/
default void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException {
}
/**
* Called after delivered message over this bridge has been acknowledged by the remote broker
*
* @param bridge
* @param ref
* @throws ActiveMQException
*/
default void afterAcknowledgeBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
}
/**
* A Critical failure has been detected.
* This will be called before the broker is stopped

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ACKNOWLEDGE_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_BINDING;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
@ -25,6 +26,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_QUEUE;
@ -42,6 +44,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DESTROY_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
@ -70,6 +73,13 @@ import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
@ -78,10 +88,12 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -285,6 +297,7 @@ public class CorePluginTest extends JMSTestBase {
Map<String, Object> server0Params = new HashMap<>();
server0 = createClusteredServerWithParams(false, 0, false, server0Params);
server0.registerBrokerPlugin(verifier);
Map<String, Object> server1Params = new HashMap<>();
server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@ -293,23 +306,21 @@ public class CorePluginTest extends JMSTestBase {
final String testAddress = "testAddress";
final String queueName0 = "queue0";
final String forwardAddress = "forwardAddress";
final String queueName1 = "queue1";
TransportConfiguration server0tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server0Params);
TransportConfiguration server1tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params);
HashMap<String, TransportConfiguration> connectors = new HashMap<>();
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
server0.registerBrokerPlugin(verifier);
final int messageSize = 1024;
final int numMessages = 10;
ArrayList<String> connectorConfig = new ArrayList<>();
connectorConfig.add(server1tc.getName());
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1")
.setQueueName(queueName0)
.setForwardingAddress(forwardAddress)
.setRetryInterval(1000)
.setReconnectAttemptsOnSameNode(-1)
.setUseDuplicateDetection(false)
.setStaticConnectors(connectorConfig);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
bridgeConfigs.add(bridgeConfiguration);
@ -320,14 +331,59 @@ public class CorePluginTest extends JMSTestBase {
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
queueConfigs1.add(queueConfig1);
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
server1.start();
server0.start();
ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
ClientSession session0 = sf0.createSession(false, true, true);
ClientSession session1 = sf1.createSession(false, true, true);
ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
ClientConsumer consumer1 = session1.createConsumer(queueName1);
session1.start();
final byte[] bytes = new byte[messageSize];
final SimpleString propKey = new SimpleString("testkey");
for (int i = 0; i < numMessages; i++) {
ClientMessage message = session0.createMessage(true);
message.putIntProperty(propKey, i);
message.getBodyBuffer().writeBytes(bytes);
producer0.send(message);
}
for (int i = 0; i < numMessages; i++) {
ClientMessage message = consumer1.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getObjectProperty(propKey));
message.acknowledge();
}
Assert.assertNull(consumer1.receiveImmediate());
session0.close();
session1.close();
sf0.close();
sf1.close();
assertEquals(1, server0.getClusterManager().getBridges().size());
BridgeMetrics bridgeMetrics = server0.getClusterManager().getBridges().get("bridge1").getMetrics();
assertEquals(10, bridgeMetrics.getMessagesPendingAcknowledgement());
assertEquals(10, bridgeMetrics.getMessagesAcknowledged());
//verify plugin method calls
verifier.validatePluginMethodsEquals(1, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
verifier.validatePluginMethodsEquals(10, BEFORE_DELIVER_BRIDGE, AFTER_DELIVER_BRIDGE, AFTER_ACKNOWLEDGE_BRIDGE);
server0.stop();
server1.stop();
}

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
@ -95,6 +96,9 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
public static final String AFTER_DELIVER = "afterDeliver";
public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge";
public static final String AFTER_DEPLOY_BRIDGE = "afterDeployBridge";
public static final String BEFORE_DELIVER_BRIDGE = "beforeDeliverBridge";
public static final String AFTER_DELIVER_BRIDGE = "afterDeliverBridge";
public static final String AFTER_ACKNOWLEDGE_BRIDGE = "afterAcknowledgeBridge";
public MethodCalledVerifier(Map<String, AtomicInteger> methodCalls) {
super();
@ -340,6 +344,24 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
methodCalled(AFTER_DEPLOY_BRIDGE);
}
@Override
public void beforeDeliverBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
Preconditions.checkNotNull(bridge);
methodCalled(BEFORE_DELIVER_BRIDGE);
}
@Override
public void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException {
Preconditions.checkNotNull(bridge);
methodCalled(AFTER_DELIVER_BRIDGE);
}
@Override
public void afterAcknowledgeBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
Preconditions.checkNotNull(bridge);
methodCalled(AFTER_ACKNOWLEDGE_BRIDGE);
}
public void validatePluginMethodsEquals(int count, String... names) {
Arrays.asList(names).forEach(name -> {
try {