ARTEMIS-1633 - fire message routing callbacks for all results

Make sure ActiveMQServer plugin implementations are always notified of
message route events
This commit is contained in:
Christopher L. Shannon (cshannon) 2018-01-24 11:03:12 -05:00 committed by Clebert Suconic
parent a82ffbcb78
commit 37cc1e3e7a
3 changed files with 20 additions and 5 deletions

View File

@ -772,6 +772,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
message.cleanupInternalProperties();
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null);
Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress(message));
// TODO auto-create queues here?
@ -841,10 +843,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
} else {
try {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null);
processRoute(message, context, direct);
final RoutingStatus finalResult = result;
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, finalResult) : null);
} catch (ActiveMQAddressFullException e) {
if (startedTX.get()) {
context.getTransaction().rollback();
@ -858,6 +857,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (startedTX.get()) {
context.getTransaction().commit();
}
final RoutingStatus finalResult = result;
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, finalResult) : null);
return result;
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -40,6 +41,7 @@ public class ConfigurationVerifier implements ActiveMQServerPlugin, Serializable
public String value2;
public String value3;
public AtomicInteger afterSendCounter = new AtomicInteger();
public AtomicInteger successRoutedCounter = new AtomicInteger();
@Override
public void init(Map<String, String> properties) {
@ -61,4 +63,12 @@ public class ConfigurationVerifier implements ActiveMQServerPlugin, Serializable
afterSendCounter.incrementAndGet();
}
@Override
public void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
RoutingStatus result) throws ActiveMQException {
if (result == RoutingStatus.OK) {
successRoutedCounter.incrementAndGet();
}
}
}

View File

@ -129,12 +129,14 @@ public class CorePluginTest extends JMSTestBase {
BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);
verifier.validatePluginMethodsEquals(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
AFTER_MESSAGE_ROUTE, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
AFTER_CLOSE_SESSION);
verifier.validatePluginMethodsAtLeast(1, BEFORE_MESSAGE_ROUTE,
AFTER_MESSAGE_ROUTE);
assertEquals("configurationVerifier is invoked", 1, configurationVerifier.afterSendCounter.get());
assertEquals("configurationVerifier is invoked", 1, configurationVerifier.successRoutedCounter.get());
assertEquals("configurationVerifier config set", "val_1", configurationVerifier.value1);
}