ARTEMIS-1258 - Add ServerSession as an argument to beforeSend and
afterSend
This commit is contained in:
parent
437232b50e
commit
a538b969c0
|
@ -16,10 +16,8 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.server.impl;
|
||||
|
||||
import javax.json.JsonArrayBuilder;
|
||||
import javax.json.JsonObjectBuilder;
|
||||
import javax.transaction.xa.XAException;
|
||||
import javax.transaction.xa.Xid;
|
||||
import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -31,6 +29,11 @@ import java.util.Set;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import javax.json.JsonArrayBuilder;
|
||||
import javax.json.JsonObjectBuilder;
|
||||
import javax.transaction.xa.XAException;
|
||||
import javax.transaction.xa.Xid;
|
||||
|
||||
import org.apache.activemq.artemis.Closeable;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
|
||||
|
@ -89,8 +92,6 @@ import org.apache.activemq.artemis.utils.PrefixUtil;
|
|||
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
|
||||
|
||||
/**
|
||||
* Server side Session implementation
|
||||
*/
|
||||
|
@ -1300,7 +1301,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
final boolean direct,
|
||||
boolean noAutoCreateQueue) throws Exception {
|
||||
|
||||
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSend(tx, message, direct, noAutoCreateQueue) : null);
|
||||
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue) : null);
|
||||
|
||||
// If the protocol doesn't support flow control, we have no choice other than fail the communication
|
||||
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
|
||||
|
@ -1351,7 +1352,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
}
|
||||
|
||||
final RoutingStatus finalResult = result;
|
||||
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSend(tx, message, direct, noAutoCreateQueue, finalResult) : null);
|
||||
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterSend(this, tx, message, direct, noAutoCreateQueue, finalResult) : null);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -228,11 +228,45 @@ public interface ActiveMQServerPlugin {
|
|||
/**
|
||||
* Before a message is sent
|
||||
*
|
||||
* @param session the session that sends the message
|
||||
* @param tx
|
||||
* @param message
|
||||
* @param direct
|
||||
* @param noAutoCreateQueue
|
||||
*/
|
||||
default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) {
|
||||
//by default call the old method for backwards compatibility
|
||||
this.beforeSend(tx, message, direct, noAutoCreateQueue);
|
||||
}
|
||||
|
||||
/**
|
||||
* After a message is sent
|
||||
*
|
||||
* @param session the session that sends the message
|
||||
* @param tx
|
||||
* @param message
|
||||
* @param direct
|
||||
* @param noAutoCreateQueue
|
||||
* @param result
|
||||
*/
|
||||
default void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
|
||||
RoutingStatus result) {
|
||||
//by default call the old method for backwards compatibility
|
||||
this.afterSend(tx, message, direct, noAutoCreateQueue, result);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Before a message is sent
|
||||
*
|
||||
* @param tx
|
||||
* @param message
|
||||
* @param direct
|
||||
* @param noAutoCreateQueue
|
||||
*
|
||||
* @deprecated use {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)}
|
||||
*/
|
||||
@Deprecated
|
||||
default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) {
|
||||
|
||||
}
|
||||
|
@ -245,7 +279,10 @@ public interface ActiveMQServerPlugin {
|
|||
* @param direct
|
||||
* @param noAutoCreateQueue
|
||||
* @param result
|
||||
*
|
||||
* @deprecated use {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)}
|
||||
*/
|
||||
@Deprecated
|
||||
default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
|
||||
RoutingStatus result) {
|
||||
|
||||
|
|
|
@ -209,13 +209,15 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) {
|
||||
public void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct,
|
||||
boolean noAutoCreateQueue) {
|
||||
Preconditions.checkNotNull(message);
|
||||
methodCalled(BEFORE_SEND);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
|
||||
public void afterSend(ServerSession session, Transaction tx, Message message, boolean direct,
|
||||
boolean noAutoCreateQueue,
|
||||
RoutingStatus result) {
|
||||
Preconditions.checkNotNull(message);
|
||||
Preconditions.checkNotNull(result);
|
||||
|
|
Loading…
Reference in New Issue