This commit is contained in:
Clebert Suconic 2018-01-25 13:34:34 -05:00
commit c671fa078e
3 changed files with 21 additions and 6 deletions

View File

@ -752,7 +752,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
boolean rejectDuplicates, boolean rejectDuplicates,
final Binding bindingMove) throws Exception { final Binding bindingMove) throws Exception {
RoutingStatus result = RoutingStatus.OK; RoutingStatus result;
// Sanity check // Sanity check
if (message.getRefCount() > 0) { if (message.getRefCount() > 0) {
throw new IllegalStateException("Message cannot be routed more than once"); throw new IllegalStateException("Message cannot be routed more than once");
@ -772,6 +772,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
message.cleanupInternalProperties(); message.cleanupInternalProperties();
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null);
Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress(message)); Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress(message));
// TODO auto-create queues here? // TODO auto-create queues here?
@ -840,11 +842,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
} }
} }
} else { } else {
result = RoutingStatus.OK;
try { try {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates) : null);
processRoute(message, context, direct); processRoute(message, context, direct);
final RoutingStatus finalResult = result;
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, finalResult) : null);
} catch (ActiveMQAddressFullException e) { } catch (ActiveMQAddressFullException e) {
if (startedTX.get()) { if (startedTX.get()) {
context.getTransaction().rollback(); context.getTransaction().rollback();
@ -858,6 +858,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (startedTX.get()) { if (startedTX.get()) {
context.getTransaction().commit(); context.getTransaction().commit();
} }
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, result) : null);
return result; return result;
} }

View File

@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
@ -40,6 +41,7 @@ public class ConfigurationVerifier implements ActiveMQServerPlugin, Serializable
public String value2; public String value2;
public String value3; public String value3;
public AtomicInteger afterSendCounter = new AtomicInteger(); public AtomicInteger afterSendCounter = new AtomicInteger();
public AtomicInteger successRoutedCounter = new AtomicInteger();
@Override @Override
public void init(Map<String, String> properties) { public void init(Map<String, String> properties) {
@ -61,4 +63,12 @@ public class ConfigurationVerifier implements ActiveMQServerPlugin, Serializable
afterSendCounter.incrementAndGet(); afterSendCounter.incrementAndGet();
} }
@Override
public void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
RoutingStatus result) throws ActiveMQException {
if (result == RoutingStatus.OK) {
successRoutedCounter.incrementAndGet();
}
}
} }

View File

@ -129,12 +129,14 @@ public class CorePluginTest extends JMSTestBase {
BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS); BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);
verifier.validatePluginMethodsEquals(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, verifier.validatePluginMethodsEquals(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
AFTER_MESSAGE_ROUTE, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
AFTER_CLOSE_SESSION); AFTER_CLOSE_SESSION);
verifier.validatePluginMethodsAtLeast(1, BEFORE_MESSAGE_ROUTE,
AFTER_MESSAGE_ROUTE);
assertEquals("configurationVerifier is invoked", 1, configurationVerifier.afterSendCounter.get()); assertEquals("configurationVerifier is invoked", 1, configurationVerifier.afterSendCounter.get());
assertEquals("configurationVerifier is invoked", 1, configurationVerifier.successRoutedCounter.get());
assertEquals("configurationVerifier config set", "val_1", configurationVerifier.value1); assertEquals("configurationVerifier config set", "val_1", configurationVerifier.value1);
} }