This commit is contained in:
Clebert Suconic 2017-06-27 12:42:41 -04:00
commit 12e7465283
3 changed files with 50 additions and 10 deletions

View File

@ -16,10 +16,8 @@
*/ */
package org.apache.activemq.artemis.core.server.impl; package org.apache.activemq.artemis.core.server.impl;
import javax.json.JsonArrayBuilder; import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
import javax.json.JsonObjectBuilder;
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -31,6 +29,11 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObjectBuilder;
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.Closeable; import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@ -89,8 +92,6 @@ import org.apache.activemq.artemis.utils.PrefixUtil;
import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
/** /**
* Server side Session implementation * Server side Session implementation
*/ */
@ -1300,7 +1301,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean direct, final boolean direct,
boolean noAutoCreateQueue) throws Exception { boolean noAutoCreateQueue) throws Exception {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSend(tx, message, direct, noAutoCreateQueue) : null); server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue) : null);
// If the protocol doesn't support flow control, we have no choice other than fail the communication // If the protocol doesn't support flow control, we have no choice other than fail the communication
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) { if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
@ -1351,7 +1352,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
} }
final RoutingStatus finalResult = result; final RoutingStatus finalResult = result;
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSend(tx, message, direct, noAutoCreateQueue, finalResult) : null); server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSend(this, tx, message, direct, noAutoCreateQueue, finalResult) : null);
return result; return result;
} }

View File

@ -228,11 +228,45 @@ public interface ActiveMQServerPlugin {
/** /**
* Before a message is sent * Before a message is sent
* *
* @param session the session that sends the message
* @param tx * @param tx
* @param message * @param message
* @param direct * @param direct
* @param noAutoCreateQueue * @param noAutoCreateQueue
*/ */
default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) {
//by default call the old method for backwards compatibility
this.beforeSend(tx, message, direct, noAutoCreateQueue);
}
/**
* After a message is sent
*
* @param session the session that sends the message
* @param tx
* @param message
* @param direct
* @param noAutoCreateQueue
* @param result
*/
default void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
RoutingStatus result) {
//by default call the old method for backwards compatibility
this.afterSend(tx, message, direct, noAutoCreateQueue, result);
}
/**
* Before a message is sent
*
* @param tx
* @param message
* @param direct
* @param noAutoCreateQueue
*
* @deprecated use {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)}
*/
@Deprecated
default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) { default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) {
} }
@ -245,7 +279,10 @@ public interface ActiveMQServerPlugin {
* @param direct * @param direct
* @param noAutoCreateQueue * @param noAutoCreateQueue
* @param result * @param result
*
* @deprecated use {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)}
*/ */
@Deprecated
default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
RoutingStatus result) { RoutingStatus result) {

View File

@ -209,13 +209,15 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
} }
@Override @Override
public void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) { public void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct,
boolean noAutoCreateQueue) {
Preconditions.checkNotNull(message); Preconditions.checkNotNull(message);
methodCalled(BEFORE_SEND); methodCalled(BEFORE_SEND);
} }
@Override @Override
public void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, public void afterSend(ServerSession session, Transaction tx, Message message, boolean direct,
boolean noAutoCreateQueue,
RoutingStatus result) { RoutingStatus result) {
Preconditions.checkNotNull(message); Preconditions.checkNotNull(message);
Preconditions.checkNotNull(result); Preconditions.checkNotNull(result);