This closes #191 - Stomp interceptors

This commit is contained in:
Clebert Suconic 2015-03-31 11:24:30 -04:00
commit c65ca252f8
19 changed files with 634 additions and 119 deletions

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.api.core;
import org.apache.activemq.spi.core.protocol.RemotingConnection;
public interface BaseInterceptor<P>
{
/**
* Intercepts a packet which is received before it is sent to the channel
*
* @param packet the packet being received
* @param connection the connection the packet was received on
* @return {@code true} to process the next interceptor and handle the packet,
* {@code false} to abort processing of the packet
* @throws ActiveMQException
*/
boolean intercept(P packet, RemotingConnection connection) throws ActiveMQException;
}

View File

@ -17,7 +17,6 @@
package org.apache.activemq.api.core; package org.apache.activemq.api.core;
import org.apache.activemq.core.protocol.core.Packet; import org.apache.activemq.core.protocol.core.Packet;
import org.apache.activemq.spi.core.protocol.RemotingConnection;
/** /**
* This is class is a simple way to intercepting calls on ActiveMQ client and servers. * This is class is a simple way to intercepting calls on ActiveMQ client and servers.
@ -26,16 +25,6 @@ import org.apache.activemq.spi.core.protocol.RemotingConnection;
* {@literal activemq-configuration.xml}.<br> * {@literal activemq-configuration.xml}.<br>
* To add it to a client, use {@link org.apache.activemq.api.core.client.ServerLocator#addIncomingInterceptor(Interceptor)} * To add it to a client, use {@link org.apache.activemq.api.core.client.ServerLocator#addIncomingInterceptor(Interceptor)}
*/ */
public interface Interceptor public interface Interceptor extends BaseInterceptor<Packet>
{ {
/**
* Intercepts a packet which is received before it is sent to the channel
*
* @param packet the packet being received
* @param connection the connection the packet was received on
* @return {@code true} to process the next interceptor and handle the packet,
* {@code false} to abort processing of the packet
* @throws ActiveMQException
*/
boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException;
} }

View File

@ -16,10 +16,13 @@
*/ */
package org.apache.activemq.core.protocol.proton; package org.apache.activemq.core.protocol.proton;
import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.api.core.Interceptor;
import org.apache.activemq.api.core.client.ActiveMQClient; import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.core.protocol.proton.converter.ProtonMessageConverter; import org.apache.activemq.core.protocol.proton.converter.ProtonMessageConverter;
import org.apache.activemq.core.protocol.proton.plug.ActiveMQProtonConnectionCallback; import org.apache.activemq.core.protocol.proton.plug.ActiveMQProtonConnectionCallback;
@ -30,6 +33,7 @@ import org.apache.activemq.core.server.management.NotificationListener;
import org.apache.activemq.spi.core.protocol.ConnectionEntry; import org.apache.activemq.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.spi.core.protocol.MessageConverter; import org.apache.activemq.spi.core.protocol.MessageConverter;
import org.apache.activemq.spi.core.protocol.ProtocolManager; import org.apache.activemq.spi.core.protocol.ProtocolManager;
import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.spi.core.protocol.RemotingConnection; import org.apache.activemq.spi.core.protocol.RemotingConnection;
import org.apache.activemq.spi.core.remoting.Acceptor; import org.apache.activemq.spi.core.remoting.Acceptor;
import org.apache.activemq.spi.core.remoting.Connection; import org.apache.activemq.spi.core.remoting.Connection;
@ -39,14 +43,18 @@ import org.proton.plug.context.server.ProtonServerConnectionContextFactory;
/** /**
* A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ resources * A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ resources
*/ */
public class ProtonProtocolManager implements ProtocolManager, NotificationListener public class ProtonProtocolManager implements ProtocolManager<Interceptor>, NotificationListener
{ {
private final ActiveMQServer server; private final ActiveMQServer server;
private MessageConverter protonConverter; private MessageConverter protonConverter;
public ProtonProtocolManager(ActiveMQServer server)
private final ProtonProtocolManagerFactory factory;
public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server)
{ {
this.factory = factory;
this.server = server; this.server = server;
this.protonConverter = new ProtonMessageConverter(server.getStorageManager()); this.protonConverter = new ProtonMessageConverter(server.getStorageManager());
} }
@ -69,6 +77,18 @@ public class ProtonProtocolManager implements ProtocolManager, NotificationListe
} }
@Override
public ProtocolManagerFactory<Interceptor> getFactory()
{
return factory;
}
@Override
public void updateInterceptors(List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors)
{
// no op
}
@Override @Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection)
{ {
@ -97,7 +117,7 @@ public class ProtonProtocolManager implements ProtocolManager, NotificationListe
@Override @Override
public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer) public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer)
{ {
ActiveMQProtonRemotingConnection protonConnection = (ActiveMQProtonRemotingConnection)connection; ActiveMQProtonRemotingConnection protonConnection = (ActiveMQProtonRemotingConnection) connection;
protonConnection.bufferReceived(protonConnection.getID(), buffer); protonConnection.bufferReceived(protonConnection.getID(), buffer);
} }

View File

@ -16,14 +16,16 @@
*/ */
package org.apache.activemq.core.protocol.proton; package org.apache.activemq.core.protocol.proton;
import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.api.core.Interceptor; import org.apache.activemq.api.core.Interceptor;
import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.spi.core.protocol.ProtocolManager; import org.apache.activemq.spi.core.protocol.ProtocolManager;
import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
import java.util.Collections;
import java.util.List; import java.util.List;
public class ProtonProtocolManagerFactory implements ProtocolManagerFactory public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor>
{ {
private static final String AMQP_PROTOCOL_NAME = "AMQP"; private static final String AMQP_PROTOCOL_NAME = "AMQP";
@ -32,7 +34,14 @@ public class ProtonProtocolManagerFactory implements ProtocolManagerFactory
@Override @Override
public ProtocolManager createProtocolManager(ActiveMQServer server, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors) public ProtocolManager createProtocolManager(ActiveMQServer server, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
{ {
return new ProtonProtocolManager(server); return new ProtonProtocolManager(this, server);
}
@Override
public List<Interceptor> filterInterceptors(List<BaseInterceptor> interceptors)
{
// no interceptors on Proton
return Collections.emptyList();
} }
@Override @Override

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.core.protocol.openwire; package org.apache.activemq.core.protocol.openwire;
import javax.jms.InvalidClientIDException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -27,12 +28,12 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.InvalidClientIDException;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.api.core.Interceptor;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
@ -54,16 +55,6 @@ import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.command.XATransactionId; import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.core.journal.IOAsyncTask; import org.apache.activemq.core.journal.IOAsyncTask;
import org.apache.activemq.core.protocol.openwire.amq.AMQConnectionContext; import org.apache.activemq.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.core.protocol.openwire.amq.AMQPersistenceAdapter; import org.apache.activemq.core.protocol.openwire.amq.AMQPersistenceAdapter;
@ -74,16 +65,26 @@ import org.apache.activemq.core.protocol.openwire.amq.AMQTransportConnectionStat
import org.apache.activemq.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.core.security.CheckType; import org.apache.activemq.core.security.CheckType;
import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.spi.core.protocol.ConnectionEntry; import org.apache.activemq.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.spi.core.protocol.MessageConverter; import org.apache.activemq.spi.core.protocol.MessageConverter;
import org.apache.activemq.spi.core.protocol.ProtocolManager; import org.apache.activemq.spi.core.protocol.ProtocolManager;
import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.spi.core.protocol.RemotingConnection; import org.apache.activemq.spi.core.protocol.RemotingConnection;
import org.apache.activemq.spi.core.remoting.Acceptor; import org.apache.activemq.spi.core.remoting.Acceptor;
import org.apache.activemq.spi.core.remoting.Connection; import org.apache.activemq.spi.core.remoting.Connection;
import org.apache.activemq.spi.core.security.ActiveMQSecurityManager; import org.apache.activemq.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.LongSequenceGenerator;
public class OpenWireProtocolManager implements ProtocolManager public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
{ {
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator(); private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
private static final IdGenerator ID_GENERATOR = new IdGenerator(); private static final IdGenerator ID_GENERATOR = new IdGenerator();
@ -91,6 +92,8 @@ public class OpenWireProtocolManager implements ProtocolManager
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final ActiveMQServer server; private final ActiveMQServer server;
private final OpenWireProtocolManagerFactory factory;
private OpenWireFormatFactory wireFactory; private OpenWireFormatFactory wireFactory;
private boolean tightEncodingEnabled = true; private boolean tightEncodingEnabled = true;
@ -104,7 +107,7 @@ public class OpenWireProtocolManager implements ProtocolManager
// from broker // from broker
protected final Map<ConnectionId, ConnectionState> brokerConnectionStates = Collections protected final Map<ConnectionId, ConnectionState> brokerConnectionStates = Collections
.synchronizedMap(new HashMap<ConnectionId, ConnectionState>()); .synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<OpenWireConnection>(); private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<OpenWireConnection>();
@ -118,8 +121,9 @@ public class OpenWireProtocolManager implements ProtocolManager
private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<TransactionId, AMQSession>(); private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<TransactionId, AMQSession>();
public OpenWireProtocolManager(ActiveMQServer server) public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server)
{ {
this.factory = factory;
this.server = server; this.server = server;
this.wireFactory = new OpenWireFormatFactory(); this.wireFactory = new OpenWireFormatFactory();
// preferred prop, should be done via config // preferred prop, should be done via config
@ -128,17 +132,30 @@ public class OpenWireProtocolManager implements ProtocolManager
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
} }
public ProtocolManagerFactory<Interceptor> getFactory()
{
return factory;
}
@Override
public void updateInterceptors(List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors)
{
// NO-OP
}
@Override @Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed,
Connection connection) Connection connection)
{ {
OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat(); OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, OpenWireConnection owConn = new OpenWireConnection(acceptorUsed,
connection, this, wf); connection, this, wf);
owConn.init(); owConn.init();
return new ConnectionEntry(owConn, null, System.currentTimeMillis(), return new ConnectionEntry(owConn, null, System.currentTimeMillis(),
1 * 60 * 1000); 1 * 60 * 1000);
} }
@Override @Override
@ -171,7 +188,7 @@ public class OpenWireProtocolManager implements ProtocolManager
if (array.length < 8) if (array.length < 8)
{ {
throw new IllegalArgumentException("Protocol header length changed " throw new IllegalArgumentException("Protocol header length changed "
+ array.length); + array.length);
} }
int start = this.prefixPacketSize ? 4 : 0; int start = this.prefixPacketSize ? 4 : 0;
@ -207,7 +224,7 @@ public class OpenWireProtocolManager implements ProtocolManager
} }
public void handleCommand(OpenWireConnection openWireConnection, public void handleCommand(OpenWireConnection openWireConnection,
Object command) Object command)
{ {
Command amqCmd = (Command) command; Command amqCmd = (Command) command;
byte type = amqCmd.getDataStructureType(); byte type = amqCmd.getDataStructureType();
@ -221,14 +238,14 @@ public class OpenWireProtocolManager implements ProtocolManager
} }
public void sendReply(final OpenWireConnection connection, public void sendReply(final OpenWireConnection connection,
final Command command) final Command command)
{ {
server.getStorageManager().afterCompleteOperations(new IOAsyncTask() server.getStorageManager().afterCompleteOperations(new IOAsyncTask()
{ {
public void onError(final int errorCode, final String errorMessage) public void onError(final int errorCode, final String errorMessage)
{ {
ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode,
errorMessage); errorMessage);
} }
public void done() public void done()
@ -285,7 +302,7 @@ public class OpenWireProtocolManager implements ProtocolManager
if (clientId == null) if (clientId == null)
{ {
throw new InvalidClientIDException( throw new InvalidClientIDException(
"No clientID specified for connection request"); "No clientID specified for connection request");
} }
synchronized (clientIdSet) synchronized (clientIdSet)
{ {
@ -308,8 +325,8 @@ public class OpenWireProtocolManager implements ProtocolManager
else else
{ {
throw new InvalidClientIDException("Broker: " + getBrokerName() throw new InvalidClientIDException("Broker: " + getBrokerName()
+ " - Client: " + clientId + " already connected from " + " - Client: " + clientId + " already connected from "
+ oldContext.getConnection().getRemoteAddress()); + oldContext.getConnection().getRemoteAddress());
} }
} }
else else
@ -329,11 +346,11 @@ public class OpenWireProtocolManager implements ProtocolManager
// init the conn // init the conn
addSessions(context.getConnection(), context.getConnectionState() addSessions(context.getConnection(), context.getConnectionState()
.getSessionIds()); .getSessionIds());
} }
private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic,
Command copy) throws Exception Command copy) throws Exception
{ {
this.fireAdvisory(context, topic, copy, null); this.fireAdvisory(context, topic, copy, null);
} }
@ -351,26 +368,26 @@ public class OpenWireProtocolManager implements ProtocolManager
* See AdvisoryBroker.fireAdvisory() * See AdvisoryBroker.fireAdvisory()
*/ */
private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic,
Command command, ConsumerId targetConsumerId) throws Exception Command command, ConsumerId targetConsumerId) throws Exception
{ {
ActiveMQMessage advisoryMessage = new ActiveMQMessage(); ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty( advisoryMessage.setStringProperty(
AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName()); AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET"; String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
advisoryMessage.setStringProperty( advisoryMessage.setStringProperty(
AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
String url = "tcp://localhost:61616"; String url = "tcp://localhost:61616";
advisoryMessage.setStringProperty( advisoryMessage.setStringProperty(
AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
// set the data structure // set the data structure
advisoryMessage.setDataStructure(command); advisoryMessage.setDataStructure(command);
advisoryMessage.setPersistent(false); advisoryMessage.setPersistent(false);
advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
advisoryMessage.setMessageId(new MessageId(advisoryProducerId, advisoryMessage.setMessageId(new MessageId(advisoryProducerId,
messageIdGenerator.getNextSequenceId())); messageIdGenerator.getNextSequenceId()));
advisoryMessage.setTargetConsumerId(targetConsumerId); advisoryMessage.setTargetConsumerId(targetConsumerId);
advisoryMessage.setDestination(topic); advisoryMessage.setDestination(topic);
advisoryMessage.setResponseRequired(false); advisoryMessage.setResponseRequired(false);
@ -402,7 +419,7 @@ public class OpenWireProtocolManager implements ProtocolManager
try try
{ {
brokerName = InetAddressUtil.getLocalHostName().toLowerCase( brokerName = InetAddressUtil.getLocalHostName().toLowerCase(
Locale.ENGLISH); Locale.ENGLISH);
} }
catch (Exception e) catch (Exception e)
{ {
@ -445,34 +462,34 @@ public class OpenWireProtocolManager implements ProtocolManager
SessionId sessionId = info.getProducerId().getParentId(); SessionId sessionId = info.getProducerId().getParentId();
ConnectionId connectionId = sessionId.getParentId(); ConnectionId connectionId = sessionId.getParentId();
AMQTransportConnectionState cs = theConn AMQTransportConnectionState cs = theConn
.lookupConnectionState(connectionId); .lookupConnectionState(connectionId);
if (cs == null) if (cs == null)
{ {
throw new IllegalStateException( throw new IllegalStateException(
"Cannot add a producer to a connection that had not been registered: " "Cannot add a producer to a connection that had not been registered: "
+ connectionId); + connectionId);
} }
SessionState ss = cs.getSessionState(sessionId); SessionState ss = cs.getSessionState(sessionId);
if (ss == null) if (ss == null)
{ {
throw new IllegalStateException( throw new IllegalStateException(
"Cannot add a producer to a session that had not been registered: " "Cannot add a producer to a session that had not been registered: "
+ sessionId); + sessionId);
} }
// Avoid replaying dup commands // Avoid replaying dup commands
if (!ss.getProducerIds().contains(info.getProducerId())) if (!ss.getProducerIds().contains(info.getProducerId()))
{ {
ActiveMQDestination destination = info.getDestination(); ActiveMQDestination destination = info.getDestination();
if (destination != null if (destination != null
&& !AdvisorySupport.isAdvisoryTopic(destination)) && !AdvisorySupport.isAdvisoryTopic(destination))
{ {
if (theConn.getProducerCount(connectionId) >= theConn if (theConn.getProducerCount(connectionId) >= theConn
.getMaximumProducersAllowedPerConnection()) .getMaximumProducersAllowedPerConnection())
{ {
throw new IllegalStateException( throw new IllegalStateException(
"Can't add producer on connection " + connectionId "Can't add producer on connection " + connectionId
+ ": at maximum limit: " + ": at maximum limit: "
+ theConn.getMaximumProducersAllowedPerConnection()); + theConn.getMaximumProducersAllowedPerConnection());
} }
} }
@ -503,35 +520,35 @@ public class OpenWireProtocolManager implements ProtocolManager
SessionId sessionId = info.getConsumerId().getParentId(); SessionId sessionId = info.getConsumerId().getParentId();
ConnectionId connectionId = sessionId.getParentId(); ConnectionId connectionId = sessionId.getParentId();
AMQTransportConnectionState cs = theConn AMQTransportConnectionState cs = theConn
.lookupConnectionState(connectionId); .lookupConnectionState(connectionId);
if (cs == null) if (cs == null)
{ {
throw new IllegalStateException( throw new IllegalStateException(
"Cannot add a consumer to a connection that had not been registered: " "Cannot add a consumer to a connection that had not been registered: "
+ connectionId); + connectionId);
} }
SessionState ss = cs.getSessionState(sessionId); SessionState ss = cs.getSessionState(sessionId);
if (ss == null) if (ss == null)
{ {
throw new IllegalStateException( throw new IllegalStateException(
this.server this.server
+ " Cannot add a consumer to a session that had not been registered: " + " Cannot add a consumer to a session that had not been registered: "
+ sessionId); + sessionId);
} }
// Avoid replaying dup commands // Avoid replaying dup commands
if (!ss.getConsumerIds().contains(info.getConsumerId())) if (!ss.getConsumerIds().contains(info.getConsumerId()))
{ {
ActiveMQDestination destination = info.getDestination(); ActiveMQDestination destination = info.getDestination();
if (destination != null if (destination != null
&& !AdvisorySupport.isAdvisoryTopic(destination)) && !AdvisorySupport.isAdvisoryTopic(destination))
{ {
if (theConn.getConsumerCount(connectionId) >= theConn if (theConn.getConsumerCount(connectionId) >= theConn
.getMaximumConsumersAllowedPerConnection()) .getMaximumConsumersAllowedPerConnection())
{ {
throw new IllegalStateException( throw new IllegalStateException(
"Can't add consumer on connection " + connectionId "Can't add consumer on connection " + connectionId
+ ": at maximum limit: " + ": at maximum limit: "
+ theConn.getMaximumConsumersAllowedPerConnection()); + theConn.getMaximumConsumersAllowedPerConnection());
} }
} }
@ -562,7 +579,7 @@ public class OpenWireProtocolManager implements ProtocolManager
{ {
SessionId sid = iter.next(); SessionId sid = iter.next();
addSession(theConn, theConn.getState().getSessionState(sid).getInfo(), addSession(theConn, theConn.getState().getSessionState(sid).getInfo(),
true); true);
} }
} }
@ -572,10 +589,10 @@ public class OpenWireProtocolManager implements ProtocolManager
} }
public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss, public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss,
boolean internal) boolean internal)
{ {
AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss, AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss,
server, theConn, this); server, theConn, this);
amqSession.initialize(); amqSession.initialize();
amqSession.setInternal(internal); amqSession.setInternal(internal);
sessions.put(ss.getSessionId(), amqSession); sessions.put(ss.getSessionId(), amqSession);
@ -583,7 +600,7 @@ public class OpenWireProtocolManager implements ProtocolManager
} }
public void removeConnection(AMQConnectionContext context, public void removeConnection(AMQConnectionContext context,
ConnectionInfo info, Throwable error) ConnectionInfo info, Throwable error)
{ {
// todo roll back tx // todo roll back tx
this.connections.remove(context.getConnection()); this.connections.remove(context.getConnection());
@ -630,13 +647,13 @@ public class OpenWireProtocolManager implements ProtocolManager
} }
public void addDestination(OpenWireConnection connection, public void addDestination(OpenWireConnection connection,
DestinationInfo info) throws Exception DestinationInfo info) throws Exception
{ {
ActiveMQDestination dest = info.getDestination(); ActiveMQDestination dest = info.getDestination();
if (dest.isQueue()) if (dest.isQueue())
{ {
SimpleString qName = new SimpleString("jms.queue." SimpleString qName = new SimpleString("jms.queue."
+ dest.getPhysicalName()); + dest.getPhysicalName());
ConnectionState state = connection.brokerConnectionStates.get(info.getConnectionId()); ConnectionState state = connection.brokerConnectionStates.get(info.getConnectionId());
ConnectionInfo connInfo = state.getInfo(); ConnectionInfo connInfo = state.getInfo();
if (connInfo != null) if (connInfo != null)
@ -646,7 +663,7 @@ public class OpenWireProtocolManager implements ProtocolManager
AMQServerSession fakeSession = new AMQServerSession(user, pass); AMQServerSession fakeSession = new AMQServerSession(user, pass);
CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE; CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
((ActiveMQServerImpl)server).getSecurityStore().check(qName, checkType, fakeSession); ((ActiveMQServerImpl) server).getSecurityStore().check(qName, checkType, fakeSession);
} }
this.server.createQueue(qName, qName, null, false, true); this.server.createQueue(qName, qName, null, false, true);
if (dest.isTemporary()) if (dest.isTemporary())

View File

@ -16,14 +16,16 @@
*/ */
package org.apache.activemq.core.protocol.openwire; package org.apache.activemq.core.protocol.openwire;
import java.util.Collections;
import java.util.List; import java.util.List;
import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.api.core.Interceptor; import org.apache.activemq.api.core.Interceptor;
import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.spi.core.protocol.ProtocolManager; import org.apache.activemq.spi.core.protocol.ProtocolManager;
import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
public class OpenWireProtocolManagerFactory implements ProtocolManagerFactory public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor>
{ {
public static final String OPENWIRE_PROTOCOL_NAME = "OPENWIRE"; public static final String OPENWIRE_PROTOCOL_NAME = "OPENWIRE";
@ -31,7 +33,13 @@ public class OpenWireProtocolManagerFactory implements ProtocolManagerFactory
public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors) public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
{ {
return new OpenWireProtocolManager(server); return new OpenWireProtocolManager(this, server);
}
@Override
public List<Interceptor> filterInterceptors(List<BaseInterceptor> interceptors)
{
return Collections.emptyList();
} }
@Override @Override

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.core.protocol.stomp;
import org.apache.activemq.api.core.BaseInterceptor;
/**
* This class is a simple way to intercepting client calls on ActiveMQ using STOMP protocol.
* <p>
* To add an interceptor to ActiveMQ server, you have to modify the server configuration file
* {@literal activemq-configuration.xml}.<br>
*/
public interface StompFrameInterceptor extends BaseInterceptor<StompFrame>
{
}

View File

@ -26,10 +26,9 @@ import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.ActiveMQExceptionType; import org.apache.activemq.api.core.ActiveMQExceptionType;
import org.apache.activemq.api.core.Interceptor; import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.client.ActiveMQClient; import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.management.CoreNotificationType; import org.apache.activemq.api.core.management.CoreNotificationType;
@ -49,6 +48,7 @@ import org.apache.activemq.core.server.management.NotificationListener;
import org.apache.activemq.spi.core.protocol.ConnectionEntry; import org.apache.activemq.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.spi.core.protocol.MessageConverter; import org.apache.activemq.spi.core.protocol.MessageConverter;
import org.apache.activemq.spi.core.protocol.ProtocolManager; import org.apache.activemq.spi.core.protocol.ProtocolManager;
import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.spi.core.protocol.RemotingConnection; import org.apache.activemq.spi.core.protocol.RemotingConnection;
import org.apache.activemq.spi.core.remoting.Acceptor; import org.apache.activemq.spi.core.remoting.Acceptor;
import org.apache.activemq.spi.core.remoting.Connection; import org.apache.activemq.spi.core.remoting.Connection;
@ -62,7 +62,7 @@ import static org.apache.activemq.core.protocol.stomp.ActiveMQStompProtocolMessa
/** /**
* StompProtocolManager * StompProtocolManager
*/ */
class StompProtocolManager implements ProtocolManager, NotificationListener class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, NotificationListener
{ {
// Constants ----------------------------------------------------- // Constants -----------------------------------------------------
@ -70,6 +70,8 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
private final ActiveMQServer server; private final ActiveMQServer server;
private final StompProtocolManagerFactory factory;
private final Executor executor; private final Executor executor;
private final Map<String, StompSession> transactedSessions = new HashMap<String, StompSession>(); private final Map<String, StompSession> transactedSessions = new HashMap<String, StompSession>();
@ -79,12 +81,16 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
private final Set<String> destinations = new ConcurrentHashSet<String>(); private final Set<String> destinations = new ConcurrentHashSet<String>();
private final List<StompFrameInterceptor> incomingInterceptors;
private final List<StompFrameInterceptor> outgoingInterceptors;
// Static -------------------------------------------------------- // Static --------------------------------------------------------
// Constructors -------------------------------------------------- // Constructors --------------------------------------------------
public StompProtocolManager(final ActiveMQServer server, final List<Interceptor> interceptors) public StompProtocolManager(final StompProtocolManagerFactory factory, final ActiveMQServer server, final List<StompFrameInterceptor> incomingInterceptors, final List<StompFrameInterceptor> outgoingInterceptors)
{ {
this.factory = factory;
this.server = server; this.server = server;
this.executor = server.getExecutorFactory().getExecutor(); this.executor = server.getExecutorFactory().getExecutor();
ManagementService service = server.getManagementService(); ManagementService service = server.getManagementService();
@ -94,6 +100,24 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
destinations.add(service.getManagementAddress().toString()); destinations.add(service.getManagementAddress().toString());
service.addNotificationListener(this); service.addNotificationListener(this);
} }
this.incomingInterceptors = incomingInterceptors;
this.outgoingInterceptors = outgoingInterceptors;
}
@Override
public ProtocolManagerFactory<StompFrameInterceptor> getFactory()
{
return factory;
}
@Override
public void updateInterceptors(List<BaseInterceptor> incoming, List<BaseInterceptor> outgoing)
{
this.incomingInterceptors.clear();
this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming));
this.outgoingInterceptors.clear();
this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
} }
@Override @Override
@ -166,6 +190,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
try try
{ {
invokeInterceptors(this.incomingInterceptors, request, conn);
conn.handleFrame(request); conn.handleFrame(request);
} }
finally finally
@ -201,6 +226,9 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
{ {
ActiveMQServerLogger.LOGGER.trace("sent " + frame); ActiveMQServerLogger.LOGGER.trace("sent " + frame);
} }
invokeInterceptors(this.outgoingInterceptors, frame, connection);
synchronized (connection) synchronized (connection)
{ {
if (connection.isDestroyed()) if (connection.isDestroyed())
@ -336,7 +364,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage); ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
ActiveMQStompException e = new ActiveMQStompException("Error sending reply", ActiveMQStompException e = new ActiveMQStompException("Error sending reply",
ActiveMQExceptionType.createException(errorCode, errorMessage)); ActiveMQExceptionType.createException(errorCode, errorMessage));
StompFrame error = e.getFrame(); StompFrame error = e.getFrame();
send(connection, error); send(connection, error);
@ -410,7 +438,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
if (stompSession.containsSubscription(subscriptionID)) if (stompSession.containsSubscription(subscriptionID))
{ {
throw new ActiveMQStompException("There already is a subscription for: " + subscriptionID + throw new ActiveMQStompException("There already is a subscription for: " + subscriptionID +
". Either use unique subscription IDs or do not create multiple subscriptions for the same destination"); ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
} }
long consumerID = server.getStorageManager().generateID(); long consumerID = server.getStorageManager().generateID();
String clientID = (connection.getClientID() != null) ? connection.getClientID() : null; String clientID = (connection.getClientID() != null) ? connection.getClientID() : null;
@ -504,4 +532,25 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
{ {
return server; return server;
} }
private void invokeInterceptors(List<StompFrameInterceptor> interceptors, final StompFrame frame, final StompConnection connection)
{
if (interceptors != null && !interceptors.isEmpty())
{
for (StompFrameInterceptor interceptor : interceptors)
{
try
{
if (!interceptor.intercept(frame, connection))
{
break;
}
}
catch (Exception e)
{
ActiveMQServerLogger.LOGGER.error(e);
}
}
}
}
} }

View File

@ -18,20 +18,26 @@ package org.apache.activemq.core.protocol.stomp;
import java.util.List; import java.util.List;
import org.apache.activemq.api.core.Interceptor; import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.spi.core.protocol.ProtocolManager; import org.apache.activemq.spi.core.protocol.ProtocolManager;
import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
public class StompProtocolManagerFactory implements ProtocolManagerFactory public class StompProtocolManagerFactory extends AbstractProtocolManagerFactory<StompFrameInterceptor>
{ {
public static final String STOMP_PROTOCOL_NAME = "STOMP"; public static final String STOMP_PROTOCOL_NAME = "STOMP";
private static String[] SUPPORTED_PROTOCOLS = {STOMP_PROTOCOL_NAME}; private static String[] SUPPORTED_PROTOCOLS = {STOMP_PROTOCOL_NAME};
public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors) public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<StompFrameInterceptor> incomingInterceptors, List<StompFrameInterceptor> outgoingInterceptors)
{ {
return new StompProtocolManager(server, incomingInterceptors); return new StompProtocolManager(this, server, incomingInterceptors, outgoingInterceptors);
}
@Override
public List<StompFrameInterceptor> filterInterceptors(List<BaseInterceptor> interceptors)
{
return filterInterceptors(StompFrameInterceptor.class, interceptors);
} }
@Override @Override

View File

@ -25,11 +25,12 @@ import java.util.concurrent.RejectedExecutionException;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.api.core.Interceptor; import org.apache.activemq.api.core.Interceptor;
import org.apache.activemq.api.core.Pair; import org.apache.activemq.api.core.Pair;
import org.apache.activemq.api.core.TransportConfiguration; import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.client.ClusterTopologyListener;
import org.apache.activemq.api.core.client.ActiveMQClient; import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.client.ClusterTopologyListener;
import org.apache.activemq.api.core.client.TopologyMember; import org.apache.activemq.api.core.client.TopologyMember;
import org.apache.activemq.core.config.Configuration; import org.apache.activemq.core.config.Configuration;
import org.apache.activemq.core.protocol.ServerPacketDecoder; import org.apache.activemq.core.protocol.ServerPacketDecoder;
@ -53,11 +54,12 @@ import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.spi.core.protocol.ConnectionEntry; import org.apache.activemq.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.spi.core.protocol.MessageConverter; import org.apache.activemq.spi.core.protocol.MessageConverter;
import org.apache.activemq.spi.core.protocol.ProtocolManager; import org.apache.activemq.spi.core.protocol.ProtocolManager;
import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.spi.core.protocol.RemotingConnection; import org.apache.activemq.spi.core.protocol.RemotingConnection;
import org.apache.activemq.spi.core.remoting.Acceptor; import org.apache.activemq.spi.core.remoting.Acceptor;
import org.apache.activemq.spi.core.remoting.Connection; import org.apache.activemq.spi.core.remoting.Connection;
class CoreProtocolManager implements ProtocolManager class CoreProtocolManager implements ProtocolManager<Interceptor>
{ {
private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@ -67,8 +69,12 @@ class CoreProtocolManager implements ProtocolManager
private final List<Interceptor> outgoingInterceptors; private final List<Interceptor> outgoingInterceptors;
CoreProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors) private final CoreProtocolManagerFactory protocolManagerFactory;
CoreProtocolManager(final CoreProtocolManagerFactory factory, final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
{ {
this.protocolManagerFactory = factory;
this.server = server; this.server = server;
this.incomingInterceptors = incomingInterceptors; this.incomingInterceptors = incomingInterceptors;
@ -76,8 +82,26 @@ class CoreProtocolManager implements ProtocolManager
this.outgoingInterceptors = outgoingInterceptors; this.outgoingInterceptors = outgoingInterceptors;
} }
@Override
public ProtocolManagerFactory<Interceptor> getFactory()
{
return protocolManagerFactory;
}
@Override
public void updateInterceptors(List<BaseInterceptor> incoming, List<BaseInterceptor> outgoing)
{
this.incomingInterceptors.clear();
this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming));
this.outgoingInterceptors.clear();
this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
}
/** /**
* no need to implement this now * no need to implement this now
*
* @return * @return
*/ */
@Override @Override

View File

@ -18,19 +18,36 @@ package org.apache.activemq.core.protocol.core.impl;
import java.util.List; import java.util.List;
import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.api.core.Interceptor; import org.apache.activemq.api.core.Interceptor;
import org.apache.activemq.api.core.client.ActiveMQClient; import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.spi.core.protocol.ProtocolManager; import org.apache.activemq.spi.core.protocol.ProtocolManager;
import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory;
public class CoreProtocolManagerFactory implements ProtocolManagerFactory public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor>
{ {
private static String[] SUPPORTED_PROTOCOLS = {ActiveMQClient.DEFAULT_CORE_PROTOCOL}; private static String[] SUPPORTED_PROTOCOLS = {ActiveMQClient.DEFAULT_CORE_PROTOCOL};
/**
* {@inheritDoc} *
* @param server
* @param incomingInterceptors
* @param outgoingInterceptors
* @return
*/
public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors) public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
{ {
return new CoreProtocolManager(server, incomingInterceptors, outgoingInterceptors); return new CoreProtocolManager(this, server, incomingInterceptors, outgoingInterceptors);
}
@Override
public List<Interceptor> filterInterceptors(List<BaseInterceptor> interceptors)
{
// This is using this tool method
// it wouldn't be possible to write a generic method without this class parameter
// and I didn't want to bloat the cllaers for this
return filterInterceptors(Interceptor.class, interceptors);
} }
@Override @Override

View File

@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.ActiveMQException; import org.apache.activemq.api.core.ActiveMQException;
import org.apache.activemq.api.core.ActiveMQInterruptedException; import org.apache.activemq.api.core.ActiveMQInterruptedException;
import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.api.core.Interceptor; import org.apache.activemq.api.core.Interceptor;
import org.apache.activemq.api.core.TransportConfiguration; import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.core.config.Configuration; import org.apache.activemq.core.config.Configuration;
@ -53,8 +54,8 @@ import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.ActiveMQServerLogger; import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.core.server.cluster.ClusterConnection; import org.apache.activemq.core.server.cluster.ClusterConnection;
import org.apache.activemq.core.server.cluster.ClusterManager; import org.apache.activemq.core.server.cluster.ClusterManager;
import org.apache.activemq.core.server.impl.ServiceRegistry;
import org.apache.activemq.core.server.impl.ServerSessionImpl; import org.apache.activemq.core.server.impl.ServerSessionImpl;
import org.apache.activemq.core.server.impl.ServiceRegistry;
import org.apache.activemq.core.server.management.ManagementService; import org.apache.activemq.core.server.management.ManagementService;
import org.apache.activemq.spi.core.protocol.ConnectionEntry; import org.apache.activemq.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.spi.core.protocol.ProtocolManager; import org.apache.activemq.spi.core.protocol.ProtocolManager;
@ -65,9 +66,9 @@ import org.apache.activemq.spi.core.remoting.AcceptorFactory;
import org.apache.activemq.spi.core.remoting.BufferHandler; import org.apache.activemq.spi.core.remoting.BufferHandler;
import org.apache.activemq.spi.core.remoting.Connection; import org.apache.activemq.spi.core.remoting.Connection;
import org.apache.activemq.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.utils.ActiveMQThreadFactory;
import org.apache.activemq.utils.ClassloadingUtil; import org.apache.activemq.utils.ClassloadingUtil;
import org.apache.activemq.utils.ConfigurationHelper; import org.apache.activemq.utils.ConfigurationHelper;
import org.apache.activemq.utils.ActiveMQThreadFactory;
import org.apache.activemq.utils.ReusableLatch; import org.apache.activemq.utils.ReusableLatch;
public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycleListener public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycleListener
@ -84,9 +85,9 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
private final Set<TransportConfiguration> acceptorsConfig; private final Set<TransportConfiguration> acceptorsConfig;
private final List<Interceptor> incomingInterceptors = new CopyOnWriteArrayList<Interceptor>(); private final List<BaseInterceptor> incomingInterceptors = new CopyOnWriteArrayList<>();
private final List<Interceptor> outgoingInterceptors = new CopyOnWriteArrayList<Interceptor>(); private final List<BaseInterceptor> outgoingInterceptors = new CopyOnWriteArrayList<>();
private final Map<String, Acceptor> acceptors = new HashMap<String, Acceptor>(); private final Map<String, Acceptor> acceptors = new HashMap<String, Acceptor>();
@ -147,7 +148,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0]); ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0]);
this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0],
coreProtocolManagerFactory.createProtocolManager(server, incomingInterceptors, outgoingInterceptors)); coreProtocolManagerFactory.createProtocolManager(server, coreProtocolManagerFactory.filterInterceptors(incomingInterceptors),
coreProtocolManagerFactory.filterInterceptors(outgoingInterceptors)));
if (config.isResolveProtocols()) if (config.isResolveProtocols())
{ {
@ -160,7 +162,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
for (String protocol : protocols) for (String protocol : protocols)
{ {
ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol); ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol);
protocolMap.put(protocol, next.createProtocolManager(server, incomingInterceptors, outgoingInterceptors)); protocolMap.put(protocol, next.createProtocolManager(server, next.filterInterceptors(incomingInterceptors),
next.filterInterceptors(outgoingInterceptors)));
} }
} }
} }
@ -190,11 +193,11 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
outgoingInterceptors.addAll(serviceRegistry.getOutgoingInterceptors()); outgoingInterceptors.addAll(serviceRegistry.getOutgoingInterceptors());
} }
private void addReflectivelyInstantiatedInterceptors(List<String> classNames, List<Interceptor> interceptors) private void addReflectivelyInstantiatedInterceptors(List<String> classNames, List<BaseInterceptor> interceptors)
{ {
for (String className : classNames) for (String className : classNames)
{ {
Interceptor interceptor = ((Interceptor) safeInitNewInstance(className)); BaseInterceptor interceptor = ((BaseInterceptor) safeInitNewInstance(className));
interceptors.add(interceptor); interceptors.add(interceptor);
} }
} }
@ -221,8 +224,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
// to support many hundreds of connections, but the main thread pool must be kept small for better performance // to support many hundreds of connections, but the main thread pool must be kept small for better performance
ThreadFactory tFactory = new ActiveMQThreadFactory("ActiveMQ-remoting-threads-" + server.toString() + ThreadFactory tFactory = new ActiveMQThreadFactory("ActiveMQ-remoting-threads-" + server.toString() +
"-" + "-" +
System.identityHashCode(this), false, tccl); System.identityHashCode(this), false, tccl);
threadPool = Executors.newCachedThreadPool(tFactory); threadPool = Executors.newCachedThreadPool(tFactory);
@ -620,24 +623,43 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
public void addIncomingInterceptor(final Interceptor interceptor) public void addIncomingInterceptor(final Interceptor interceptor)
{ {
incomingInterceptors.add(interceptor); incomingInterceptors.add(interceptor);
updateProtocols();
} }
@Override @Override
public boolean removeIncomingInterceptor(final Interceptor interceptor) public boolean removeIncomingInterceptor(final Interceptor interceptor)
{ {
return incomingInterceptors.remove(interceptor); if (incomingInterceptors.remove(interceptor))
{
updateProtocols();
return true;
}
else
{
return false;
}
} }
@Override @Override
public void addOutgoingInterceptor(final Interceptor interceptor) public void addOutgoingInterceptor(final Interceptor interceptor)
{ {
outgoingInterceptors.add(interceptor); outgoingInterceptors.add(interceptor);
updateProtocols();
} }
@Override @Override
public boolean removeOutgoingInterceptor(final Interceptor interceptor) public boolean removeOutgoingInterceptor(final Interceptor interceptor)
{ {
return outgoingInterceptors.remove(interceptor); if (outgoingInterceptors.remove(interceptor))
{
updateProtocols();
return true;
}
else
{
return false;
}
} }
private ClusterConnection lookupClusterConnection(TransportConfiguration acceptorConfig) private ClusterConnection lookupClusterConnection(TransportConfiguration acceptorConfig)
@ -803,4 +825,13 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
}); });
} }
protected void updateProtocols()
{
for (ProtocolManager<?> protocolManager : this.protocolMap.values())
{
protocolManager.updateInterceptors(incomingInterceptors, outgoingInterceptors);
}
}
} }

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.spi.core.protocol;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.api.core.BaseInterceptor;
public abstract class AbstractProtocolManagerFactory<P extends BaseInterceptor> implements ProtocolManagerFactory<P>
{
/**
* This method exists because java templates won't store the type of P at runtime.
* So it's not possible to write a generic method with having the Class Type.
* This will serve as a tool for sub classes to filter properly* * *
*
* @param type
* @param listIn
* @return
*/
protected List<P> filterInterceptors(Class<P> type, List<? extends BaseInterceptor> listIn)
{
if (listIn == null)
{
return Collections.emptyList();
}
else
{
CopyOnWriteArrayList<P> listOut = new CopyOnWriteArrayList();
for (BaseInterceptor<?> in : listIn)
{
if (type.isInstance(in))
{
listOut.add((P) in);
}
}
return listOut;
}
}
}

View File

@ -16,14 +16,27 @@
*/ */
package org.apache.activemq.spi.core.protocol; package org.apache.activemq.spi.core.protocol;
import java.util.List;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.spi.core.remoting.Acceptor; import org.apache.activemq.spi.core.remoting.Acceptor;
import org.apache.activemq.spi.core.remoting.Connection; import org.apache.activemq.spi.core.remoting.Connection;
public interface ProtocolManager public interface ProtocolManager<P extends BaseInterceptor>
{ {
ProtocolManagerFactory<P> getFactory();
/**
* This method will receive all the interceptors on the system and you should filter them out *
*
* @param incomingInterceptors
* @param outgoingInterceptors
*/
void updateInterceptors(List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors);
ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection); ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection);
void removeHandler(final String name); void removeHandler(final String name);
@ -37,6 +50,7 @@ public interface ProtocolManager
/** /**
* Gets the Message Converter towards ActiveMQ. * Gets the Message Converter towards ActiveMQ.
* Notice this being null means no need to convert * Notice this being null means no need to convert
*
* @return * @return
*/ */
MessageConverter getConverter(); MessageConverter getConverter();

View File

@ -18,12 +18,28 @@ package org.apache.activemq.spi.core.protocol;
import java.util.List; import java.util.List;
import org.apache.activemq.api.core.Interceptor; import org.apache.activemq.api.core.BaseInterceptor;
import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.core.server.ActiveMQServer;
public interface ProtocolManagerFactory public interface ProtocolManagerFactory<P extends BaseInterceptor>
{ {
ProtocolManager createProtocolManager(ActiveMQServer server, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors); /**
* When you create the ProtocolManager, you should filter out any interceptors that won't belong
* to this Protocol.
* For example don't send any core Interceptors {@link org.apache.activemq.api.core.Interceptor} to Stomp * * *
* @param server
* @param incomingInterceptors
* @param outgoingInterceptors
* @return
*/
ProtocolManager createProtocolManager(ActiveMQServer server, List<P> incomingInterceptors, List<P> outgoingInterceptors);
/**
* This should get the entire list and only return the ones this factory can deal with *
* @param interceptors
* @return
*/
List<P> filterInterceptors(List<BaseInterceptor> interceptors);
String[] getProtocols(); String[] getProtocols();
} }

View File

@ -20,6 +20,17 @@ public interface Interceptor
} }
``` ```
For stomp protocol an interceptor must implement the `StompFrameInterceptor class`:
``` java
package org.apache.activemq.core.protocol.stomp;
public interface StompFrameInterceptor
{
public abstract boolean intercept(StompFrame stompFrame, RemotingConnection connection);
}
```
The returned boolean value is important: The returned boolean value is important:
- if `true` is returned, the process continues normally - if `true` is returned, the process continues normally

View File

@ -20,14 +20,20 @@ import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.QueueBrowser; import javax.jms.QueueBrowser;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.activemq.api.core.Interceptor;
import org.apache.activemq.api.core.TransportConfiguration; import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.client.ActiveMQClient; import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.core.config.Configuration; import org.apache.activemq.core.config.Configuration;
import org.apache.activemq.core.protocol.core.Packet;
import org.apache.activemq.core.protocol.stomp.Stomp; import org.apache.activemq.core.protocol.stomp.Stomp;
import org.apache.activemq.core.protocol.stomp.StompFrame;
import org.apache.activemq.core.protocol.stomp.StompFrameInterceptor;
import org.apache.activemq.core.protocol.stomp.StompProtocolManagerFactory; import org.apache.activemq.core.protocol.stomp.StompProtocolManagerFactory;
import org.apache.activemq.core.registry.JndiBindingRegistry; import org.apache.activemq.core.registry.JndiBindingRegistry;
import org.apache.activemq.core.remoting.impl.invm.InVMAcceptorFactory; import org.apache.activemq.core.remoting.impl.invm.InVMAcceptorFactory;
@ -41,6 +47,7 @@ import org.apache.activemq.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.jms.server.config.impl.JMSQueueConfigurationImpl; import org.apache.activemq.jms.server.config.impl.JMSQueueConfigurationImpl;
import org.apache.activemq.jms.server.config.impl.TopicConfigurationImpl; import org.apache.activemq.jms.server.config.impl.TopicConfigurationImpl;
import org.apache.activemq.jms.server.impl.JMSServerManagerImpl; import org.apache.activemq.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.spi.core.protocol.RemotingConnection;
import org.apache.activemq.tests.integration.largemessage.LargeMessageTestBase; import org.apache.activemq.tests.integration.largemessage.LargeMessageTestBase;
import org.apache.activemq.tests.integration.largemessage.LargeMessageTestBase.TestLargeMessageInputStream; import org.apache.activemq.tests.integration.largemessage.LargeMessageTestBase.TestLargeMessageInputStream;
import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame;
@ -815,4 +822,175 @@ public class ExtraStompTest extends StompTestBase
return server; return server;
} }
public static class MyCoreInterceptor implements Interceptor
{
static List<Packet> incomingInterceptedFrames = new ArrayList<Packet>();
@Override
public boolean intercept(Packet packet, RemotingConnection connection)
{
incomingInterceptedFrames.add(packet);
return true;
}
}
public static class MyIncomingStompFrameInterceptor implements StompFrameInterceptor
{
static List<StompFrame> incomingInterceptedFrames = new ArrayList<StompFrame>();
@Override
public boolean intercept(StompFrame stompFrame, RemotingConnection connection)
{
incomingInterceptedFrames.add(stompFrame);
stompFrame.addHeader("incomingInterceptedProp", "incomingInterceptedVal");
return true;
}
}
public static class MyOutgoingStompFrameInterceptor implements StompFrameInterceptor
{
static List<StompFrame> outgoingInterceptedFrames = new ArrayList<StompFrame>();
@Override
public boolean intercept(StompFrame stompFrame, RemotingConnection connection)
{
outgoingInterceptedFrames.add(stompFrame);
stompFrame.addHeader("outgoingInterceptedProp", "outgoingInterceptedVal");
return true;
}
}
@Test
public void stompFrameInterceptor() throws Exception
{
MyIncomingStompFrameInterceptor.incomingInterceptedFrames.clear();
MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.clear();
try
{
List<String> incomingInterceptorList = new ArrayList<String>();
incomingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyIncomingStompFrameInterceptor");
incomingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyCoreInterceptor");
List<String> outgoingInterceptorList = new ArrayList<String>();
outgoingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyOutgoingStompFrameInterceptor");
server = createServerWithStompInterceptor(incomingInterceptorList, outgoingInterceptorList);
server.start();
setUpAfterServer(); // This will make some calls through core
// So we clear them here
MyCoreInterceptor.incomingInterceptedFrames.clear();
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
sendFrame(frame);
frame = receiveFrame(100000);
frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
sendFrame(frame);
assertEquals(0, MyCoreInterceptor.incomingInterceptedFrames.size());
sendMessage(getName());
// Something was supposed to be called on sendMessages
assertTrue("core interceptor is not working", MyCoreInterceptor.incomingInterceptedFrames.size() > 0);
receiveFrame(10000);
frame = "SEND\n" + "destination:" +
getQueuePrefix() +
getQueueName() +
"\n\n" +
"Hello World" +
Stomp.NULL;
sendFrame(frame);
receiveFrame(10000);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
sendFrame(frame);
}
finally
{
cleanUp();
server.stop();
}
List<String> incomingCommands = new ArrayList<String>(4);
incomingCommands.add("CONNECT");
incomingCommands.add("SUBSCRIBE");
incomingCommands.add("SEND");
incomingCommands.add("DISCONNECT");
List<String> outgoingCommands = new ArrayList<String>(3);
outgoingCommands.add("CONNECTED");
outgoingCommands.add("MESSAGE");
outgoingCommands.add("MESSAGE");
long timeout = System.currentTimeMillis() + 1000;
// Things are async, giving some time to things arrive before we actually assert
while (MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size() < 4 &&
MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size() < 3 &&
timeout > System.currentTimeMillis())
{
Thread.sleep(10);
}
Assert.assertEquals(4, MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size());
Assert.assertEquals(3, MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size());
for (int i = 0; i < MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size(); i++)
{
Assert.assertEquals(incomingCommands.get(i), MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getCommand());
Assert.assertEquals("incomingInterceptedVal", MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getHeader("incomingInterceptedProp"));
}
for (int i = 0; i < MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size(); i++)
{
Assert.assertEquals(outgoingCommands.get(i), MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(i).getCommand());
}
Assert.assertEquals("incomingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("incomingInterceptedProp"));
Assert.assertEquals("outgoingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("outgoingInterceptedProp"));
}
protected JMSServerManager createServerWithStompInterceptor(List<String> stompIncomingInterceptor, List<String> stompOutgoingInterceptor) throws Exception
{
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME);
params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
TransportConfiguration stompTransport = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
Configuration config = createBasicConfig()
.setPersistenceEnabled(false)
.addAcceptorConfiguration(stompTransport)
.addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY))
.setIncomingInterceptorClassNames(stompIncomingInterceptor)
.setOutgoingInterceptorClassNames(stompOutgoingInterceptor);
ActiveMQServer hornetQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
JMSConfiguration jmsConfig = new JMSConfigurationImpl();
jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl()
.setName(getQueueName())
.setDurable(false)
.setBindings(getQueueName()));
jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl()
.setName(getTopicName())
.setBindings(getTopicName()));
server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
server.setRegistry(new JndiBindingRegistry(new InVMNamingContext()));
return server;
}
} }

View File

@ -182,10 +182,10 @@ public abstract class StompTestBase extends UnitTestCase
createBootstrap(); createBootstrap();
connection = connectionFactory.createConnection(); connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(getQueueName()); queue = session.createQueue(getQueueName());
topic = session.createTopic(getTopicName()); topic = session.createTopic(getTopicName());
connection.start();
} }

View File

@ -625,8 +625,13 @@ public class QueueImplTest extends UnitTestCase
queue.resume(); queue.resume();
// Need to make sure the consumers will receive the messages before we do these assertions // Need to make sure the consumers will receive the messages before we do these assertions
long timeout = System.currentTimeMillis() + 1000; long timeout = System.currentTimeMillis() + 5000;
while (cons1.getReferences().size() != numMessages / 2 && cons2.getReferences().size() != numMessages / 2 && timeout > System.currentTimeMillis()) while (cons1.getReferences().size() != numMessages / 2 && timeout > System.currentTimeMillis())
{
Thread.sleep(1);
}
while (cons2.getReferences().size() != numMessages / 2 && timeout > System.currentTimeMillis())
{ {
Thread.sleep(1); Thread.sleep(1);
} }