diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/BaseInterceptor.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/BaseInterceptor.java
new file mode 100644
index 0000000000..57c788b368
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/BaseInterceptor.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.api.core;
+
+import org.apache.activemq.spi.core.protocol.RemotingConnection;
+
+public interface BaseInterceptor
+{
+ /**
+ * Intercepts a packet which is received before it is sent to the channel
+ *
+ * @param packet the packet being received
+ * @param connection the connection the packet was received on
+ * @return {@code true} to process the next interceptor and handle the packet,
+ * {@code false} to abort processing of the packet
+ * @throws ActiveMQException
+ */
+ boolean intercept(P packet, RemotingConnection connection) throws ActiveMQException;
+
+}
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/Interceptor.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/Interceptor.java
index a1fa94a4ad..7cfee0b58d 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/Interceptor.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/Interceptor.java
@@ -17,7 +17,6 @@
package org.apache.activemq.api.core;
import org.apache.activemq.core.protocol.core.Packet;
-import org.apache.activemq.spi.core.protocol.RemotingConnection;
/**
* This is class is a simple way to intercepting calls on ActiveMQ client and servers.
@@ -26,16 +25,6 @@ import org.apache.activemq.spi.core.protocol.RemotingConnection;
* {@literal activemq-configuration.xml}.
* To add it to a client, use {@link org.apache.activemq.api.core.client.ServerLocator#addIncomingInterceptor(Interceptor)}
*/
-public interface Interceptor
+public interface Interceptor extends BaseInterceptor
{
- /**
- * Intercepts a packet which is received before it is sent to the channel
- *
- * @param packet the packet being received
- * @param connection the connection the packet was received on
- * @return {@code true} to process the next interceptor and handle the packet,
- * {@code false} to abort processing of the packet
- * @throws ActiveMQException
- */
- boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException;
}
diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java
index b3f3211153..dd88cab598 100644
--- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java
+++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java
@@ -16,10 +16,13 @@
*/
package org.apache.activemq.core.protocol.proton;
+import java.util.List;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelPipeline;
import org.apache.activemq.api.core.ActiveMQBuffer;
+import org.apache.activemq.api.core.BaseInterceptor;
+import org.apache.activemq.api.core.Interceptor;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.core.protocol.proton.converter.ProtonMessageConverter;
import org.apache.activemq.core.protocol.proton.plug.ActiveMQProtonConnectionCallback;
@@ -30,6 +33,7 @@ import org.apache.activemq.core.server.management.NotificationListener;
import org.apache.activemq.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.spi.core.protocol.MessageConverter;
import org.apache.activemq.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.spi.core.protocol.RemotingConnection;
import org.apache.activemq.spi.core.remoting.Acceptor;
import org.apache.activemq.spi.core.remoting.Connection;
@@ -39,14 +43,18 @@ import org.proton.plug.context.server.ProtonServerConnectionContextFactory;
/**
* A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ resources
*/
-public class ProtonProtocolManager implements ProtocolManager, NotificationListener
+public class ProtonProtocolManager implements ProtocolManager, NotificationListener
{
private final ActiveMQServer server;
private MessageConverter protonConverter;
- public ProtonProtocolManager(ActiveMQServer server)
+
+ private final ProtonProtocolManagerFactory factory;
+
+ public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server)
{
+ this.factory = factory;
this.server = server;
this.protonConverter = new ProtonMessageConverter(server.getStorageManager());
}
@@ -69,6 +77,18 @@ public class ProtonProtocolManager implements ProtocolManager, NotificationListe
}
+ @Override
+ public ProtocolManagerFactory getFactory()
+ {
+ return factory;
+ }
+
+ @Override
+ public void updateInterceptors(List incomingInterceptors, List outgoingInterceptors)
+ {
+ // no op
+ }
+
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection)
{
@@ -97,7 +117,7 @@ public class ProtonProtocolManager implements ProtocolManager, NotificationListe
@Override
public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer)
{
- ActiveMQProtonRemotingConnection protonConnection = (ActiveMQProtonRemotingConnection)connection;
+ ActiveMQProtonRemotingConnection protonConnection = (ActiveMQProtonRemotingConnection) connection;
protonConnection.bufferReceived(protonConnection.getID(), buffer);
}
diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java
index c83f10cd3e..6fe25765ca 100644
--- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java
+++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java
@@ -16,14 +16,16 @@
*/
package org.apache.activemq.core.protocol.proton;
+import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.api.core.Interceptor;
import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.spi.core.protocol.ProtocolManager;
-import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
+import java.util.Collections;
import java.util.List;
-public class ProtonProtocolManagerFactory implements ProtocolManagerFactory
+public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
{
private static final String AMQP_PROTOCOL_NAME = "AMQP";
@@ -32,7 +34,14 @@ public class ProtonProtocolManagerFactory implements ProtocolManagerFactory
@Override
public ProtocolManager createProtocolManager(ActiveMQServer server, List incomingInterceptors, List outgoingInterceptors)
{
- return new ProtonProtocolManager(server);
+ return new ProtonProtocolManager(this, server);
+ }
+
+ @Override
+ public List filterInterceptors(List interceptors)
+ {
+ // no interceptors on Proton
+ return Collections.emptyList();
}
@Override
diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java
index dfa4e4a8fa..74d7d77487 100644
--- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.core.protocol.openwire;
+import javax.jms.InvalidClientIDException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -27,12 +28,12 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import javax.jms.InvalidClientIDException;
-
import io.netty.channel.ChannelPipeline;
-
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.api.core.ActiveMQBuffer;
+import org.apache.activemq.api.core.BaseInterceptor;
+import org.apache.activemq.api.core.Interceptor;
+import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
@@ -54,16 +55,6 @@ import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.core.server.ActiveMQServerLogger;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.openwire.OpenWireFormatFactory;
-import org.apache.activemq.state.ConnectionState;
-import org.apache.activemq.state.ProducerState;
-import org.apache.activemq.state.SessionState;
-import org.apache.activemq.util.IdGenerator;
-import org.apache.activemq.util.InetAddressUtil;
-import org.apache.activemq.util.LongSequenceGenerator;
-import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.core.journal.IOAsyncTask;
import org.apache.activemq.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.core.protocol.openwire.amq.AMQPersistenceAdapter;
@@ -74,16 +65,26 @@ import org.apache.activemq.core.protocol.openwire.amq.AMQTransportConnectionStat
import org.apache.activemq.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.core.security.CheckType;
import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.spi.core.protocol.MessageConverter;
import org.apache.activemq.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.spi.core.protocol.RemotingConnection;
import org.apache.activemq.spi.core.remoting.Acceptor;
import org.apache.activemq.spi.core.remoting.Connection;
import org.apache.activemq.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.state.ConnectionState;
+import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.state.SessionState;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.InetAddressUtil;
+import org.apache.activemq.util.LongSequenceGenerator;
-public class OpenWireProtocolManager implements ProtocolManager
+public class OpenWireProtocolManager implements ProtocolManager
{
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
private static final IdGenerator ID_GENERATOR = new IdGenerator();
@@ -91,6 +92,8 @@ public class OpenWireProtocolManager implements ProtocolManager
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final ActiveMQServer server;
+ private final OpenWireProtocolManagerFactory factory;
+
private OpenWireFormatFactory wireFactory;
private boolean tightEncodingEnabled = true;
@@ -104,7 +107,7 @@ public class OpenWireProtocolManager implements ProtocolManager
// from broker
protected final Map brokerConnectionStates = Collections
- .synchronizedMap(new HashMap());
+ .synchronizedMap(new HashMap());
private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList();
@@ -118,8 +121,9 @@ public class OpenWireProtocolManager implements ProtocolManager
private Map transactions = new ConcurrentHashMap();
- public OpenWireProtocolManager(ActiveMQServer server)
+ public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server)
{
+ this.factory = factory;
this.server = server;
this.wireFactory = new OpenWireFormatFactory();
// preferred prop, should be done via config
@@ -128,17 +132,30 @@ public class OpenWireProtocolManager implements ProtocolManager
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
}
+
+ public ProtocolManagerFactory getFactory()
+ {
+ return factory;
+ }
+
+
+ @Override
+ public void updateInterceptors(List incomingInterceptors, List outgoingInterceptors)
+ {
+ // NO-OP
+ }
+
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed,
- Connection connection)
+ Connection connection)
{
OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
OpenWireConnection owConn = new OpenWireConnection(acceptorUsed,
- connection, this, wf);
+ connection, this, wf);
owConn.init();
return new ConnectionEntry(owConn, null, System.currentTimeMillis(),
- 1 * 60 * 1000);
+ 1 * 60 * 1000);
}
@Override
@@ -171,7 +188,7 @@ public class OpenWireProtocolManager implements ProtocolManager
if (array.length < 8)
{
throw new IllegalArgumentException("Protocol header length changed "
- + array.length);
+ + array.length);
}
int start = this.prefixPacketSize ? 4 : 0;
@@ -207,7 +224,7 @@ public class OpenWireProtocolManager implements ProtocolManager
}
public void handleCommand(OpenWireConnection openWireConnection,
- Object command)
+ Object command)
{
Command amqCmd = (Command) command;
byte type = amqCmd.getDataStructureType();
@@ -221,14 +238,14 @@ public class OpenWireProtocolManager implements ProtocolManager
}
public void sendReply(final OpenWireConnection connection,
- final Command command)
+ final Command command)
{
server.getStorageManager().afterCompleteOperations(new IOAsyncTask()
{
public void onError(final int errorCode, final String errorMessage)
{
ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode,
- errorMessage);
+ errorMessage);
}
public void done()
@@ -285,7 +302,7 @@ public class OpenWireProtocolManager implements ProtocolManager
if (clientId == null)
{
throw new InvalidClientIDException(
- "No clientID specified for connection request");
+ "No clientID specified for connection request");
}
synchronized (clientIdSet)
{
@@ -308,8 +325,8 @@ public class OpenWireProtocolManager implements ProtocolManager
else
{
throw new InvalidClientIDException("Broker: " + getBrokerName()
- + " - Client: " + clientId + " already connected from "
- + oldContext.getConnection().getRemoteAddress());
+ + " - Client: " + clientId + " already connected from "
+ + oldContext.getConnection().getRemoteAddress());
}
}
else
@@ -329,11 +346,11 @@ public class OpenWireProtocolManager implements ProtocolManager
// init the conn
addSessions(context.getConnection(), context.getConnectionState()
- .getSessionIds());
+ .getSessionIds());
}
private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic,
- Command copy) throws Exception
+ Command copy) throws Exception
{
this.fireAdvisory(context, topic, copy, null);
}
@@ -351,26 +368,26 @@ public class OpenWireProtocolManager implements ProtocolManager
* See AdvisoryBroker.fireAdvisory()
*/
private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic,
- Command command, ConsumerId targetConsumerId) throws Exception
+ Command command, ConsumerId targetConsumerId) throws Exception
{
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(
- AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
+ AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
advisoryMessage.setStringProperty(
- AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
+ AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
String url = "tcp://localhost:61616";
advisoryMessage.setStringProperty(
- AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
+ AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
// set the data structure
advisoryMessage.setDataStructure(command);
advisoryMessage.setPersistent(false);
advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
advisoryMessage.setMessageId(new MessageId(advisoryProducerId,
- messageIdGenerator.getNextSequenceId()));
+ messageIdGenerator.getNextSequenceId()));
advisoryMessage.setTargetConsumerId(targetConsumerId);
advisoryMessage.setDestination(topic);
advisoryMessage.setResponseRequired(false);
@@ -402,7 +419,7 @@ public class OpenWireProtocolManager implements ProtocolManager
try
{
brokerName = InetAddressUtil.getLocalHostName().toLowerCase(
- Locale.ENGLISH);
+ Locale.ENGLISH);
}
catch (Exception e)
{
@@ -445,34 +462,34 @@ public class OpenWireProtocolManager implements ProtocolManager
SessionId sessionId = info.getProducerId().getParentId();
ConnectionId connectionId = sessionId.getParentId();
AMQTransportConnectionState cs = theConn
- .lookupConnectionState(connectionId);
+ .lookupConnectionState(connectionId);
if (cs == null)
{
throw new IllegalStateException(
- "Cannot add a producer to a connection that had not been registered: "
- + connectionId);
+ "Cannot add a producer to a connection that had not been registered: "
+ + connectionId);
}
SessionState ss = cs.getSessionState(sessionId);
if (ss == null)
{
throw new IllegalStateException(
- "Cannot add a producer to a session that had not been registered: "
- + sessionId);
+ "Cannot add a producer to a session that had not been registered: "
+ + sessionId);
}
// Avoid replaying dup commands
if (!ss.getProducerIds().contains(info.getProducerId()))
{
ActiveMQDestination destination = info.getDestination();
if (destination != null
- && !AdvisorySupport.isAdvisoryTopic(destination))
+ && !AdvisorySupport.isAdvisoryTopic(destination))
{
if (theConn.getProducerCount(connectionId) >= theConn
- .getMaximumProducersAllowedPerConnection())
+ .getMaximumProducersAllowedPerConnection())
{
throw new IllegalStateException(
- "Can't add producer on connection " + connectionId
- + ": at maximum limit: "
- + theConn.getMaximumProducersAllowedPerConnection());
+ "Can't add producer on connection " + connectionId
+ + ": at maximum limit: "
+ + theConn.getMaximumProducersAllowedPerConnection());
}
}
@@ -503,35 +520,35 @@ public class OpenWireProtocolManager implements ProtocolManager
SessionId sessionId = info.getConsumerId().getParentId();
ConnectionId connectionId = sessionId.getParentId();
AMQTransportConnectionState cs = theConn
- .lookupConnectionState(connectionId);
+ .lookupConnectionState(connectionId);
if (cs == null)
{
throw new IllegalStateException(
- "Cannot add a consumer to a connection that had not been registered: "
- + connectionId);
+ "Cannot add a consumer to a connection that had not been registered: "
+ + connectionId);
}
SessionState ss = cs.getSessionState(sessionId);
if (ss == null)
{
throw new IllegalStateException(
- this.server
- + " Cannot add a consumer to a session that had not been registered: "
- + sessionId);
+ this.server
+ + " Cannot add a consumer to a session that had not been registered: "
+ + sessionId);
}
// Avoid replaying dup commands
if (!ss.getConsumerIds().contains(info.getConsumerId()))
{
ActiveMQDestination destination = info.getDestination();
if (destination != null
- && !AdvisorySupport.isAdvisoryTopic(destination))
+ && !AdvisorySupport.isAdvisoryTopic(destination))
{
if (theConn.getConsumerCount(connectionId) >= theConn
- .getMaximumConsumersAllowedPerConnection())
+ .getMaximumConsumersAllowedPerConnection())
{
throw new IllegalStateException(
- "Can't add consumer on connection " + connectionId
- + ": at maximum limit: "
- + theConn.getMaximumConsumersAllowedPerConnection());
+ "Can't add consumer on connection " + connectionId
+ + ": at maximum limit: "
+ + theConn.getMaximumConsumersAllowedPerConnection());
}
}
@@ -562,7 +579,7 @@ public class OpenWireProtocolManager implements ProtocolManager
{
SessionId sid = iter.next();
addSession(theConn, theConn.getState().getSessionState(sid).getInfo(),
- true);
+ true);
}
}
@@ -572,10 +589,10 @@ public class OpenWireProtocolManager implements ProtocolManager
}
public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss,
- boolean internal)
+ boolean internal)
{
AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss,
- server, theConn, this);
+ server, theConn, this);
amqSession.initialize();
amqSession.setInternal(internal);
sessions.put(ss.getSessionId(), amqSession);
@@ -583,7 +600,7 @@ public class OpenWireProtocolManager implements ProtocolManager
}
public void removeConnection(AMQConnectionContext context,
- ConnectionInfo info, Throwable error)
+ ConnectionInfo info, Throwable error)
{
// todo roll back tx
this.connections.remove(context.getConnection());
@@ -630,13 +647,13 @@ public class OpenWireProtocolManager implements ProtocolManager
}
public void addDestination(OpenWireConnection connection,
- DestinationInfo info) throws Exception
+ DestinationInfo info) throws Exception
{
ActiveMQDestination dest = info.getDestination();
if (dest.isQueue())
{
SimpleString qName = new SimpleString("jms.queue."
- + dest.getPhysicalName());
+ + dest.getPhysicalName());
ConnectionState state = connection.brokerConnectionStates.get(info.getConnectionId());
ConnectionInfo connInfo = state.getInfo();
if (connInfo != null)
@@ -646,7 +663,7 @@ public class OpenWireProtocolManager implements ProtocolManager
AMQServerSession fakeSession = new AMQServerSession(user, pass);
CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
- ((ActiveMQServerImpl)server).getSecurityStore().check(qName, checkType, fakeSession);
+ ((ActiveMQServerImpl) server).getSecurityStore().check(qName, checkType, fakeSession);
}
this.server.createQueue(qName, qName, null, false, true);
if (dest.isTemporary())
diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java
index 42f8f4d816..5593ee6200 100644
--- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java
+++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java
@@ -16,14 +16,16 @@
*/
package org.apache.activemq.core.protocol.openwire;
+import java.util.Collections;
import java.util.List;
+import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.api.core.Interceptor;
import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.spi.core.protocol.ProtocolManager;
-import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
-public class OpenWireProtocolManagerFactory implements ProtocolManagerFactory
+public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFactory
{
public static final String OPENWIRE_PROTOCOL_NAME = "OPENWIRE";
@@ -31,7 +33,13 @@ public class OpenWireProtocolManagerFactory implements ProtocolManagerFactory
public ProtocolManager createProtocolManager(final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors)
{
- return new OpenWireProtocolManager(server);
+ return new OpenWireProtocolManager(this, server);
+ }
+
+ @Override
+ public List filterInterceptors(List interceptors)
+ {
+ return Collections.emptyList();
}
@Override
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java
new file mode 100644
index 0000000000..619c29fe67
--- /dev/null
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.core.protocol.stomp;
+
+import org.apache.activemq.api.core.BaseInterceptor;
+
+/**
+ * This class is a simple way to intercepting client calls on ActiveMQ using STOMP protocol.
+ *
+ * To add an interceptor to ActiveMQ server, you have to modify the server configuration file
+ * {@literal activemq-configuration.xml}.
+ */
+public interface StompFrameInterceptor extends BaseInterceptor
+{
+}
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
index 1b71987e2e..3780180faf 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
@@ -26,10 +26,9 @@ import java.util.Set;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelPipeline;
-
import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.ActiveMQExceptionType;
-import org.apache.activemq.api.core.Interceptor;
+import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.management.CoreNotificationType;
@@ -49,6 +48,7 @@ import org.apache.activemq.core.server.management.NotificationListener;
import org.apache.activemq.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.spi.core.protocol.MessageConverter;
import org.apache.activemq.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.spi.core.protocol.RemotingConnection;
import org.apache.activemq.spi.core.remoting.Acceptor;
import org.apache.activemq.spi.core.remoting.Connection;
@@ -62,7 +62,7 @@ import static org.apache.activemq.core.protocol.stomp.ActiveMQStompProtocolMessa
/**
* StompProtocolManager
*/
-class StompProtocolManager implements ProtocolManager, NotificationListener
+class StompProtocolManager implements ProtocolManager, NotificationListener
{
// Constants -----------------------------------------------------
@@ -70,6 +70,8 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
private final ActiveMQServer server;
+ private final StompProtocolManagerFactory factory;
+
private final Executor executor;
private final Map transactedSessions = new HashMap();
@@ -79,12 +81,16 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
private final Set destinations = new ConcurrentHashSet();
+ private final List incomingInterceptors;
+ private final List outgoingInterceptors;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public StompProtocolManager(final ActiveMQServer server, final List interceptors)
+ public StompProtocolManager(final StompProtocolManagerFactory factory, final ActiveMQServer server, final List incomingInterceptors, final List outgoingInterceptors)
{
+ this.factory = factory;
this.server = server;
this.executor = server.getExecutorFactory().getExecutor();
ManagementService service = server.getManagementService();
@@ -94,6 +100,24 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
destinations.add(service.getManagementAddress().toString());
service.addNotificationListener(this);
}
+ this.incomingInterceptors = incomingInterceptors;
+ this.outgoingInterceptors = outgoingInterceptors;
+ }
+
+ @Override
+ public ProtocolManagerFactory getFactory()
+ {
+ return factory;
+ }
+
+ @Override
+ public void updateInterceptors(List incoming, List outgoing)
+ {
+ this.incomingInterceptors.clear();
+ this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming));
+
+ this.outgoingInterceptors.clear();
+ this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
}
@Override
@@ -166,6 +190,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
try
{
+ invokeInterceptors(this.incomingInterceptors, request, conn);
conn.handleFrame(request);
}
finally
@@ -201,6 +226,9 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
{
ActiveMQServerLogger.LOGGER.trace("sent " + frame);
}
+
+ invokeInterceptors(this.outgoingInterceptors, frame, connection);
+
synchronized (connection)
{
if (connection.isDestroyed())
@@ -336,7 +364,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
ActiveMQStompException e = new ActiveMQStompException("Error sending reply",
- ActiveMQExceptionType.createException(errorCode, errorMessage));
+ ActiveMQExceptionType.createException(errorCode, errorMessage));
StompFrame error = e.getFrame();
send(connection, error);
@@ -410,7 +438,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
if (stompSession.containsSubscription(subscriptionID))
{
throw new ActiveMQStompException("There already is a subscription for: " + subscriptionID +
- ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
+ ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
}
long consumerID = server.getStorageManager().generateID();
String clientID = (connection.getClientID() != null) ? connection.getClientID() : null;
@@ -504,4 +532,25 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
{
return server;
}
+
+ private void invokeInterceptors(List interceptors, final StompFrame frame, final StompConnection connection)
+ {
+ if (interceptors != null && !interceptors.isEmpty())
+ {
+ for (StompFrameInterceptor interceptor : interceptors)
+ {
+ try
+ {
+ if (!interceptor.intercept(frame, connection))
+ {
+ break;
+ }
+ }
+ catch (Exception e)
+ {
+ ActiveMQServerLogger.LOGGER.error(e);
+ }
+ }
+ }
+ }
}
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java
index 72e7734110..1edf561857 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java
@@ -18,20 +18,26 @@ package org.apache.activemq.core.protocol.stomp;
import java.util.List;
-import org.apache.activemq.api.core.Interceptor;
+import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.spi.core.protocol.ProtocolManager;
-import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
-public class StompProtocolManagerFactory implements ProtocolManagerFactory
+public class StompProtocolManagerFactory extends AbstractProtocolManagerFactory
{
public static final String STOMP_PROTOCOL_NAME = "STOMP";
private static String[] SUPPORTED_PROTOCOLS = {STOMP_PROTOCOL_NAME};
- public ProtocolManager createProtocolManager(final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors)
+ public ProtocolManager createProtocolManager(final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors)
{
- return new StompProtocolManager(server, incomingInterceptors);
+ return new StompProtocolManager(this, server, incomingInterceptors, outgoingInterceptors);
+ }
+
+ @Override
+ public List filterInterceptors(List interceptors)
+ {
+ return filterInterceptors(StompFrameInterceptor.class, interceptors);
}
@Override
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManager.java b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManager.java
index 0785e3a0c2..0768ccf1b7 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManager.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManager.java
@@ -25,11 +25,12 @@ import java.util.concurrent.RejectedExecutionException;
import io.netty.channel.ChannelPipeline;
import org.apache.activemq.api.core.ActiveMQBuffer;
+import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.api.core.Interceptor;
import org.apache.activemq.api.core.Pair;
import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.api.core.client.ClusterTopologyListener;
import org.apache.activemq.api.core.client.ActiveMQClient;
+import org.apache.activemq.api.core.client.ClusterTopologyListener;
import org.apache.activemq.api.core.client.TopologyMember;
import org.apache.activemq.core.config.Configuration;
import org.apache.activemq.core.protocol.ServerPacketDecoder;
@@ -53,11 +54,12 @@ import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.spi.core.protocol.MessageConverter;
import org.apache.activemq.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.spi.core.protocol.RemotingConnection;
import org.apache.activemq.spi.core.remoting.Acceptor;
import org.apache.activemq.spi.core.remoting.Connection;
-class CoreProtocolManager implements ProtocolManager
+class CoreProtocolManager implements ProtocolManager
{
private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@@ -67,8 +69,12 @@ class CoreProtocolManager implements ProtocolManager
private final List outgoingInterceptors;
- CoreProtocolManager(final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors)
+ private final CoreProtocolManagerFactory protocolManagerFactory;
+
+ CoreProtocolManager(final CoreProtocolManagerFactory factory, final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors)
{
+ this.protocolManagerFactory = factory;
+
this.server = server;
this.incomingInterceptors = incomingInterceptors;
@@ -76,8 +82,26 @@ class CoreProtocolManager implements ProtocolManager
this.outgoingInterceptors = outgoingInterceptors;
}
+
+ @Override
+ public ProtocolManagerFactory getFactory()
+ {
+ return protocolManagerFactory;
+ }
+
+ @Override
+ public void updateInterceptors(List incoming, List outgoing)
+ {
+ this.incomingInterceptors.clear();
+ this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming));
+
+ this.outgoingInterceptors.clear();
+ this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
+ }
+
/**
* no need to implement this now
+ *
* @return
*/
@Override
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManagerFactory.java b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManagerFactory.java
index e9d0ac5fbb..147abe09a6 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManagerFactory.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManagerFactory.java
@@ -18,19 +18,36 @@ package org.apache.activemq.core.protocol.core.impl;
import java.util.List;
+import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.api.core.Interceptor;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.spi.core.protocol.ProtocolManager;
-import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
-public class CoreProtocolManagerFactory implements ProtocolManagerFactory
+public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory
{
private static String[] SUPPORTED_PROTOCOLS = {ActiveMQClient.DEFAULT_CORE_PROTOCOL};
+ /**
+ * {@inheritDoc} *
+ * @param server
+ * @param incomingInterceptors
+ * @param outgoingInterceptors
+ * @return
+ */
public ProtocolManager createProtocolManager(final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors)
{
- return new CoreProtocolManager(server, incomingInterceptors, outgoingInterceptors);
+ return new CoreProtocolManager(this, server, incomingInterceptors, outgoingInterceptors);
+ }
+
+ @Override
+ public List filterInterceptors(List interceptors)
+ {
+ // This is using this tool method
+ // it wouldn't be possible to write a generic method without this class parameter
+ // and I didn't want to bloat the cllaers for this
+ return filterInterceptors(Interceptor.class, interceptors);
}
@Override
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java
index 10b01062b1..147b39add7 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.ActiveMQException;
import org.apache.activemq.api.core.ActiveMQInterruptedException;
+import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.api.core.Interceptor;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.core.config.Configuration;
@@ -53,8 +54,8 @@ import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.core.server.cluster.ClusterConnection;
import org.apache.activemq.core.server.cluster.ClusterManager;
-import org.apache.activemq.core.server.impl.ServiceRegistry;
import org.apache.activemq.core.server.impl.ServerSessionImpl;
+import org.apache.activemq.core.server.impl.ServiceRegistry;
import org.apache.activemq.core.server.management.ManagementService;
import org.apache.activemq.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.spi.core.protocol.ProtocolManager;
@@ -65,9 +66,9 @@ import org.apache.activemq.spi.core.remoting.AcceptorFactory;
import org.apache.activemq.spi.core.remoting.BufferHandler;
import org.apache.activemq.spi.core.remoting.Connection;
import org.apache.activemq.spi.core.remoting.ConnectionLifeCycleListener;
+import org.apache.activemq.utils.ActiveMQThreadFactory;
import org.apache.activemq.utils.ClassloadingUtil;
import org.apache.activemq.utils.ConfigurationHelper;
-import org.apache.activemq.utils.ActiveMQThreadFactory;
import org.apache.activemq.utils.ReusableLatch;
public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycleListener
@@ -84,9 +85,9 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
private final Set acceptorsConfig;
- private final List incomingInterceptors = new CopyOnWriteArrayList();
+ private final List incomingInterceptors = new CopyOnWriteArrayList<>();
- private final List outgoingInterceptors = new CopyOnWriteArrayList();
+ private final List outgoingInterceptors = new CopyOnWriteArrayList<>();
private final Map acceptors = new HashMap();
@@ -147,7 +148,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0]);
this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0],
- coreProtocolManagerFactory.createProtocolManager(server, incomingInterceptors, outgoingInterceptors));
+ coreProtocolManagerFactory.createProtocolManager(server, coreProtocolManagerFactory.filterInterceptors(incomingInterceptors),
+ coreProtocolManagerFactory.filterInterceptors(outgoingInterceptors)));
if (config.isResolveProtocols())
{
@@ -160,7 +162,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
for (String protocol : protocols)
{
ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol);
- protocolMap.put(protocol, next.createProtocolManager(server, incomingInterceptors, outgoingInterceptors));
+ protocolMap.put(protocol, next.createProtocolManager(server, next.filterInterceptors(incomingInterceptors),
+ next.filterInterceptors(outgoingInterceptors)));
}
}
}
@@ -190,11 +193,11 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
outgoingInterceptors.addAll(serviceRegistry.getOutgoingInterceptors());
}
- private void addReflectivelyInstantiatedInterceptors(List classNames, List interceptors)
+ private void addReflectivelyInstantiatedInterceptors(List classNames, List interceptors)
{
for (String className : classNames)
{
- Interceptor interceptor = ((Interceptor) safeInitNewInstance(className));
+ BaseInterceptor interceptor = ((BaseInterceptor) safeInitNewInstance(className));
interceptors.add(interceptor);
}
}
@@ -221,8 +224,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
// to support many hundreds of connections, but the main thread pool must be kept small for better performance
ThreadFactory tFactory = new ActiveMQThreadFactory("ActiveMQ-remoting-threads-" + server.toString() +
- "-" +
- System.identityHashCode(this), false, tccl);
+ "-" +
+ System.identityHashCode(this), false, tccl);
threadPool = Executors.newCachedThreadPool(tFactory);
@@ -620,24 +623,43 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
public void addIncomingInterceptor(final Interceptor interceptor)
{
incomingInterceptors.add(interceptor);
+
+ updateProtocols();
}
@Override
public boolean removeIncomingInterceptor(final Interceptor interceptor)
{
- return incomingInterceptors.remove(interceptor);
+ if (incomingInterceptors.remove(interceptor))
+ {
+ updateProtocols();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
@Override
public void addOutgoingInterceptor(final Interceptor interceptor)
{
outgoingInterceptors.add(interceptor);
+ updateProtocols();
}
@Override
public boolean removeOutgoingInterceptor(final Interceptor interceptor)
{
- return outgoingInterceptors.remove(interceptor);
+ if (outgoingInterceptors.remove(interceptor))
+ {
+ updateProtocols();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
private ClusterConnection lookupClusterConnection(TransportConfiguration acceptorConfig)
@@ -803,4 +825,13 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
});
}
+ protected void updateProtocols()
+ {
+ for (ProtocolManager> protocolManager : this.protocolMap.values())
+ {
+ protocolManager.updateInterceptors(incomingInterceptors, outgoingInterceptors);
+ }
+
+ }
+
}
diff --git a/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/AbstractProtocolManagerFactory.java b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/AbstractProtocolManagerFactory.java
new file mode 100644
index 0000000000..df2d555ee9
--- /dev/null
+++ b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/AbstractProtocolManagerFactory.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.spi.core.protocol;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.activemq.api.core.BaseInterceptor;
+
+public abstract class AbstractProtocolManagerFactory implements ProtocolManagerFactory
+{
+
+ /**
+ * This method exists because java templates won't store the type of P at runtime.
+ * So it's not possible to write a generic method with having the Class Type.
+ * This will serve as a tool for sub classes to filter properly* * *
+ *
+ * @param type
+ * @param listIn
+ * @return
+ */
+ protected List
filterInterceptors(Class
type, List extends BaseInterceptor> listIn)
+ {
+ if (listIn == null)
+ {
+ return Collections.emptyList();
+ }
+ else
+ {
+ CopyOnWriteArrayList
listOut = new CopyOnWriteArrayList();
+ for (BaseInterceptor> in : listIn)
+ {
+ if (type.isInstance(in))
+ {
+ listOut.add((P) in);
+ }
+ }
+ return listOut;
+ }
+ }
+}
diff --git a/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManager.java b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManager.java
index 254ac00a84..c07d1b751b 100644
--- a/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManager.java
+++ b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManager.java
@@ -16,14 +16,27 @@
*/
package org.apache.activemq.spi.core.protocol;
+import java.util.List;
+
import io.netty.channel.ChannelPipeline;
import org.apache.activemq.api.core.ActiveMQBuffer;
+import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.spi.core.remoting.Acceptor;
import org.apache.activemq.spi.core.remoting.Connection;
-public interface ProtocolManager
+public interface ProtocolManager
{
+ ProtocolManagerFactory
getFactory();
+
+ /**
+ * This method will receive all the interceptors on the system and you should filter them out *
+ *
+ * @param incomingInterceptors
+ * @param outgoingInterceptors
+ */
+ void updateInterceptors(List incomingInterceptors, List outgoingInterceptors);
+
ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection);
void removeHandler(final String name);
@@ -37,6 +50,7 @@ public interface ProtocolManager
/**
* Gets the Message Converter towards ActiveMQ.
* Notice this being null means no need to convert
+ *
* @return
*/
MessageConverter getConverter();
diff --git a/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManagerFactory.java b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManagerFactory.java
index 25f3d62fba..69f2426f4e 100644
--- a/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManagerFactory.java
+++ b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManagerFactory.java
@@ -18,12 +18,28 @@ package org.apache.activemq.spi.core.protocol;
import java.util.List;
-import org.apache.activemq.api.core.Interceptor;
+import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.core.server.ActiveMQServer;
-public interface ProtocolManagerFactory
+public interface ProtocolManagerFactory
{
- ProtocolManager createProtocolManager(ActiveMQServer server, List incomingInterceptors, List outgoingInterceptors);
+ /**
+ * When you create the ProtocolManager, you should filter out any interceptors that won't belong
+ * to this Protocol.
+ * For example don't send any core Interceptors {@link org.apache.activemq.api.core.Interceptor} to Stomp * * *
+ * @param server
+ * @param incomingInterceptors
+ * @param outgoingInterceptors
+ * @return
+ */
+ ProtocolManager createProtocolManager(ActiveMQServer server, List incomingInterceptors, List
outgoingInterceptors);
+
+ /**
+ * This should get the entire list and only return the ones this factory can deal with *
+ * @param interceptors
+ * @return
+ */
+ List
filterInterceptors(List interceptors);
String[] getProtocols();
}
diff --git a/docs/user-manual/en/intercepting-operations.md b/docs/user-manual/en/intercepting-operations.md
index 4258de299f..ad036880d8 100644
--- a/docs/user-manual/en/intercepting-operations.md
+++ b/docs/user-manual/en/intercepting-operations.md
@@ -20,6 +20,17 @@ public interface Interceptor
}
```
+For stomp protocol an interceptor must implement the `StompFrameInterceptor class`:
+
+``` java
+package org.apache.activemq.core.protocol.stomp;
+
+public interface StompFrameInterceptor
+{
+ public abstract boolean intercept(StompFrame stompFrame, RemotingConnection connection);
+}
+```
+
The returned boolean value is important:
- if `true` is returned, the process continues normally
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java
index 976f74fed2..17a19435ea 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java
@@ -20,14 +20,20 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.QueueBrowser;
import javax.jms.TextMessage;
+import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.activemq.api.core.Interceptor;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.core.config.Configuration;
+import org.apache.activemq.core.protocol.core.Packet;
import org.apache.activemq.core.protocol.stomp.Stomp;
+import org.apache.activemq.core.protocol.stomp.StompFrame;
+import org.apache.activemq.core.protocol.stomp.StompFrameInterceptor;
import org.apache.activemq.core.protocol.stomp.StompProtocolManagerFactory;
import org.apache.activemq.core.registry.JndiBindingRegistry;
import org.apache.activemq.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -41,6 +47,7 @@ import org.apache.activemq.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.jms.server.config.impl.JMSQueueConfigurationImpl;
import org.apache.activemq.jms.server.config.impl.TopicConfigurationImpl;
import org.apache.activemq.jms.server.impl.JMSServerManagerImpl;
+import org.apache.activemq.spi.core.protocol.RemotingConnection;
import org.apache.activemq.tests.integration.largemessage.LargeMessageTestBase;
import org.apache.activemq.tests.integration.largemessage.LargeMessageTestBase.TestLargeMessageInputStream;
import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame;
@@ -815,4 +822,175 @@ public class ExtraStompTest extends StompTestBase
return server;
}
+
+ public static class MyCoreInterceptor implements Interceptor
+ {
+ static List incomingInterceptedFrames = new ArrayList();
+
+ @Override
+ public boolean intercept(Packet packet, RemotingConnection connection)
+ {
+ incomingInterceptedFrames.add(packet);
+ return true;
+ }
+
+ }
+
+
+ public static class MyIncomingStompFrameInterceptor implements StompFrameInterceptor
+ {
+ static List incomingInterceptedFrames = new ArrayList();
+
+ @Override
+ public boolean intercept(StompFrame stompFrame, RemotingConnection connection)
+ {
+ incomingInterceptedFrames.add(stompFrame);
+ stompFrame.addHeader("incomingInterceptedProp", "incomingInterceptedVal");
+ return true;
+ }
+ }
+
+
+ public static class MyOutgoingStompFrameInterceptor implements StompFrameInterceptor
+ {
+
+ static List outgoingInterceptedFrames = new ArrayList();
+
+ @Override
+ public boolean intercept(StompFrame stompFrame, RemotingConnection connection)
+ {
+ outgoingInterceptedFrames.add(stompFrame);
+ stompFrame.addHeader("outgoingInterceptedProp", "outgoingInterceptedVal");
+ return true;
+ }
+ }
+
+ @Test
+ public void stompFrameInterceptor() throws Exception
+ {
+ MyIncomingStompFrameInterceptor.incomingInterceptedFrames.clear();
+ MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.clear();
+ try
+ {
+ List incomingInterceptorList = new ArrayList();
+ incomingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyIncomingStompFrameInterceptor");
+ incomingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyCoreInterceptor");
+ List outgoingInterceptorList = new ArrayList();
+ outgoingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyOutgoingStompFrameInterceptor");
+
+ server = createServerWithStompInterceptor(incomingInterceptorList, outgoingInterceptorList);
+ server.start();
+
+ setUpAfterServer(); // This will make some calls through core
+
+ // So we clear them here
+ MyCoreInterceptor.incomingInterceptedFrames.clear();
+
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
+ sendFrame(frame);
+
+ assertEquals(0, MyCoreInterceptor.incomingInterceptedFrames.size());
+ sendMessage(getName());
+
+ // Something was supposed to be called on sendMessages
+ assertTrue("core interceptor is not working", MyCoreInterceptor.incomingInterceptedFrames.size() > 0);
+
+ receiveFrame(10000);
+
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ receiveFrame(10000);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+
+ sendFrame(frame);
+
+ }
+ finally
+ {
+ cleanUp();
+ server.stop();
+ }
+
+ List incomingCommands = new ArrayList(4);
+ incomingCommands.add("CONNECT");
+ incomingCommands.add("SUBSCRIBE");
+ incomingCommands.add("SEND");
+ incomingCommands.add("DISCONNECT");
+
+ List outgoingCommands = new ArrayList(3);
+ outgoingCommands.add("CONNECTED");
+ outgoingCommands.add("MESSAGE");
+ outgoingCommands.add("MESSAGE");
+
+ long timeout = System.currentTimeMillis() + 1000;
+
+ // Things are async, giving some time to things arrive before we actually assert
+ while (MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size() < 4 &&
+ MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size() < 3 &&
+ timeout > System.currentTimeMillis())
+ {
+ Thread.sleep(10);
+ }
+
+ Assert.assertEquals(4, MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size());
+ Assert.assertEquals(3, MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size());
+
+ for (int i = 0; i < MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size(); i++)
+ {
+ Assert.assertEquals(incomingCommands.get(i), MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getCommand());
+ Assert.assertEquals("incomingInterceptedVal", MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getHeader("incomingInterceptedProp"));
+ }
+
+ for (int i = 0; i < MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size(); i++)
+ {
+ Assert.assertEquals(outgoingCommands.get(i), MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(i).getCommand());
+ }
+
+ Assert.assertEquals("incomingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("incomingInterceptedProp"));
+ Assert.assertEquals("outgoingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("outgoingInterceptedProp"));
+ }
+
+ protected JMSServerManager createServerWithStompInterceptor(List stompIncomingInterceptor, List stompOutgoingInterceptor) throws Exception
+ {
+
+ Map params = new HashMap();
+ params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME);
+ params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+ params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
+ TransportConfiguration stompTransport = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+
+ Configuration config = createBasicConfig()
+ .setPersistenceEnabled(false)
+ .addAcceptorConfiguration(stompTransport)
+ .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY))
+ .setIncomingInterceptorClassNames(stompIncomingInterceptor)
+ .setOutgoingInterceptorClassNames(stompOutgoingInterceptor);
+
+ ActiveMQServer hornetQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
+
+ JMSConfiguration jmsConfig = new JMSConfigurationImpl();
+ jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl()
+ .setName(getQueueName())
+ .setDurable(false)
+ .setBindings(getQueueName()));
+ jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl()
+ .setName(getTopicName())
+ .setBindings(getTopicName()));
+ server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
+ server.setRegistry(new JndiBindingRegistry(new InVMNamingContext()));
+ return server;
+ }
+
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java
index 2387c719a5..937b85d2f7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java
@@ -182,10 +182,10 @@ public abstract class StompTestBase extends UnitTestCase
createBootstrap();
connection = connectionFactory.createConnection();
+ connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(getQueueName());
topic = session.createTopic(getTopicName());
- connection.start();
}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java
index 8286ab665e..3be1f611ad 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java
@@ -625,8 +625,13 @@ public class QueueImplTest extends UnitTestCase
queue.resume();
// Need to make sure the consumers will receive the messages before we do these assertions
- long timeout = System.currentTimeMillis() + 1000;
- while (cons1.getReferences().size() != numMessages / 2 && cons2.getReferences().size() != numMessages / 2 && timeout > System.currentTimeMillis())
+ long timeout = System.currentTimeMillis() + 5000;
+ while (cons1.getReferences().size() != numMessages / 2 && timeout > System.currentTimeMillis())
+ {
+ Thread.sleep(1);
+ }
+
+ while (cons2.getReferences().size() != numMessages / 2 && timeout > System.currentTimeMillis())
{
Thread.sleep(1);
}