This closes #2229
This commit is contained in:
commit
c8fc7c64f3
|
@ -521,6 +521,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
ref.getQueue().acknowledge(ref);
|
||||
pendingAcks.countDown();
|
||||
metrics.incrementMessagesAcknowledged();
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterAcknowledgeBridge(this, ref));
|
||||
}
|
||||
} else {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("BridgeImpl::sendAcknowledged bridge " + this + " could not find reference for message " + message);
|
||||
|
@ -614,13 +618,17 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
pendingAcks.countUp();
|
||||
|
||||
try {
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.beforeDeliverBridge(this, ref));
|
||||
}
|
||||
|
||||
final HandleStatus status;
|
||||
if (message.isLargeMessage()) {
|
||||
deliveringLargeMessage = true;
|
||||
deliverLargeMessage(dest, ref, (LargeServerMessage) message);
|
||||
status = HandleStatus.HANDLED;
|
||||
} else {
|
||||
status = deliverStandardMessage(dest, ref, message);
|
||||
status = deliverStandardMessage(dest, ref, message);
|
||||
}
|
||||
|
||||
//Only increment messages pending acknowledgement if handled by bridge
|
||||
|
@ -628,6 +636,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
metrics.incrementMessagesPendingAcknowledgement();
|
||||
}
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterDeliverBridge(this, ref, status));
|
||||
}
|
||||
|
||||
return status;
|
||||
} catch (Exception e) {
|
||||
// If an exception happened, we must count down immediately
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
|||
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
|
||||
import org.apache.activemq.artemis.core.security.SecurityAuth;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.HandleStatus;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.QueueConfig;
|
||||
|
@ -610,7 +611,6 @@ public interface ActiveMQServerPlugin {
|
|||
this.messageAcknowledged(ref, reason);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Before a bridge is deployed
|
||||
*
|
||||
|
@ -631,6 +631,41 @@ public interface ActiveMQServerPlugin {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called immediately before a bridge delivers a message
|
||||
*
|
||||
* @param bridge
|
||||
* @param ref
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeDeliverBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called immediately after a bridge delivers a message but before the message
|
||||
* is acknowledged
|
||||
*
|
||||
* @param bridge
|
||||
* @param ref
|
||||
* @param status
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after delivered message over this bridge has been acknowledged by the remote broker
|
||||
*
|
||||
* @param bridge
|
||||
* @param ref
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterAcknowledgeBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A Critical failure has been detected.
|
||||
* This will be called before the broker is stopped
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.plugin;
|
||||
|
||||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ACKNOWLEDGE_BRIDGE;
|
||||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS;
|
||||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_BINDING;
|
||||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
|
||||
|
@ -25,6 +26,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
|
|||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
|
||||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
|
||||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
|
||||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER_BRIDGE;
|
||||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
|
||||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
|
||||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_QUEUE;
|
||||
|
@ -42,6 +44,7 @@ import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledV
|
|||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
|
||||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
|
||||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
|
||||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER_BRIDGE;
|
||||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
|
||||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DESTROY_QUEUE;
|
||||
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
|
||||
|
@ -70,6 +73,13 @@ import javax.jms.Topic;
|
|||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
|
||||
|
@ -78,10 +88,12 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
|
||||
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
|
||||
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -285,6 +297,7 @@ public class CorePluginTest extends JMSTestBase {
|
|||
|
||||
Map<String, Object> server0Params = new HashMap<>();
|
||||
server0 = createClusteredServerWithParams(false, 0, false, server0Params);
|
||||
server0.registerBrokerPlugin(verifier);
|
||||
|
||||
Map<String, Object> server1Params = new HashMap<>();
|
||||
server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
|
||||
|
@ -293,23 +306,21 @@ public class CorePluginTest extends JMSTestBase {
|
|||
final String testAddress = "testAddress";
|
||||
final String queueName0 = "queue0";
|
||||
final String forwardAddress = "forwardAddress";
|
||||
final String queueName1 = "queue1";
|
||||
|
||||
TransportConfiguration server0tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server0Params);
|
||||
TransportConfiguration server1tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params);
|
||||
|
||||
HashMap<String, TransportConfiguration> connectors = new HashMap<>();
|
||||
connectors.put(server1tc.getName(), server1tc);
|
||||
server0.getConfiguration().setConnectorConfigurations(connectors);
|
||||
server0.registerBrokerPlugin(verifier);
|
||||
|
||||
final int messageSize = 1024;
|
||||
final int numMessages = 10;
|
||||
|
||||
ArrayList<String> connectorConfig = new ArrayList<>();
|
||||
connectorConfig.add(server1tc.getName());
|
||||
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1")
|
||||
.setQueueName(queueName0)
|
||||
.setForwardingAddress(forwardAddress)
|
||||
.setRetryInterval(1000)
|
||||
.setReconnectAttemptsOnSameNode(-1)
|
||||
.setUseDuplicateDetection(false)
|
||||
.setStaticConnectors(connectorConfig);
|
||||
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig);
|
||||
|
||||
List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
|
||||
bridgeConfigs.add(bridgeConfiguration);
|
||||
|
@ -320,14 +331,59 @@ public class CorePluginTest extends JMSTestBase {
|
|||
queueConfigs0.add(queueConfig0);
|
||||
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
|
||||
|
||||
CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
|
||||
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
|
||||
queueConfigs1.add(queueConfig1);
|
||||
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
|
||||
|
||||
server1.start();
|
||||
server0.start();
|
||||
|
||||
ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
|
||||
ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
|
||||
ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
|
||||
|
||||
ClientSession session0 = sf0.createSession(false, true, true);
|
||||
ClientSession session1 = sf1.createSession(false, true, true);
|
||||
ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
|
||||
ClientConsumer consumer1 = session1.createConsumer(queueName1);
|
||||
session1.start();
|
||||
|
||||
final byte[] bytes = new byte[messageSize];
|
||||
final SimpleString propKey = new SimpleString("testkey");
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = session0.createMessage(true);
|
||||
message.putIntProperty(propKey, i);
|
||||
message.getBodyBuffer().writeBytes(bytes);
|
||||
producer0.send(message);
|
||||
}
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = consumer1.receive(5000);
|
||||
Assert.assertNotNull(message);
|
||||
Assert.assertEquals(i, message.getObjectProperty(propKey));
|
||||
message.acknowledge();
|
||||
}
|
||||
|
||||
Assert.assertNull(consumer1.receiveImmediate());
|
||||
session0.close();
|
||||
session1.close();
|
||||
|
||||
sf0.close();
|
||||
sf1.close();
|
||||
|
||||
assertEquals(1, server0.getClusterManager().getBridges().size());
|
||||
BridgeMetrics bridgeMetrics = server0.getClusterManager().getBridges().get("bridge1").getMetrics();
|
||||
assertEquals(10, bridgeMetrics.getMessagesPendingAcknowledgement());
|
||||
assertEquals(10, bridgeMetrics.getMessagesAcknowledged());
|
||||
|
||||
//verify plugin method calls
|
||||
verifier.validatePluginMethodsEquals(1, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
|
||||
verifier.validatePluginMethodsEquals(10, BEFORE_DELIVER_BRIDGE, AFTER_DELIVER_BRIDGE, AFTER_ACKNOWLEDGE_BRIDGE);
|
||||
|
||||
server0.stop();
|
||||
server1.stop();
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
|
|||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
|
||||
import org.apache.activemq.artemis.core.security.SecurityAuth;
|
||||
import org.apache.activemq.artemis.core.server.HandleStatus;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.QueueConfig;
|
||||
|
@ -95,6 +96,9 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
|
|||
public static final String AFTER_DELIVER = "afterDeliver";
|
||||
public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge";
|
||||
public static final String AFTER_DEPLOY_BRIDGE = "afterDeployBridge";
|
||||
public static final String BEFORE_DELIVER_BRIDGE = "beforeDeliverBridge";
|
||||
public static final String AFTER_DELIVER_BRIDGE = "afterDeliverBridge";
|
||||
public static final String AFTER_ACKNOWLEDGE_BRIDGE = "afterAcknowledgeBridge";
|
||||
|
||||
public MethodCalledVerifier(Map<String, AtomicInteger> methodCalls) {
|
||||
super();
|
||||
|
@ -340,6 +344,24 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
|
|||
methodCalled(AFTER_DEPLOY_BRIDGE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeDeliverBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
|
||||
Preconditions.checkNotNull(bridge);
|
||||
methodCalled(BEFORE_DELIVER_BRIDGE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException {
|
||||
Preconditions.checkNotNull(bridge);
|
||||
methodCalled(AFTER_DELIVER_BRIDGE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterAcknowledgeBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
|
||||
Preconditions.checkNotNull(bridge);
|
||||
methodCalled(AFTER_ACKNOWLEDGE_BRIDGE);
|
||||
}
|
||||
|
||||
public void validatePluginMethodsEquals(int count, String... names) {
|
||||
Arrays.asList(names).forEach(name -> {
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue