From b2524b1be49198b1a96b78e70d7777b5214b8a8a Mon Sep 17 00:00:00 2001 From: nberdikov Date: Fri, 13 Mar 2015 10:29:33 +0100 Subject: [PATCH] ACTIVEMQ6-89 Added possibility to intercept stomp frames https://issues.apache.org/jira/browse/ACTIVEMQ6-89 This was originally contributed at #182. We have squashed the commits and rebased them here This closes #182 --- .../protocol/stomp/StompFrameInterceptor.java | 58 ++++++++ .../protocol/stomp/StompProtocolManager.java | 35 ++++- .../stomp/StompProtocolManagerFactory.java | 2 +- .../user-manual/en/intercepting-operations.md | 11 ++ .../integration/stomp/ExtraStompTest.java | 138 ++++++++++++++++++ 5 files changed, 242 insertions(+), 2 deletions(-) create mode 100644 activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java new file mode 100644 index 0000000000..9af6e2772f --- /dev/null +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.core.protocol.stomp; + +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.Interceptor; +import org.apache.activemq.core.protocol.core.Packet; +import org.apache.activemq.spi.core.protocol.RemotingConnection; + +/** + * This class is a simple way to intercepting client calls on ActiveMQ using STOMP protocol. + *

+ * To add an interceptor to ActiveMQ server, you have to modify the server configuration file + * {@literal activemq-configuration.xml}.
+ */ +public abstract class StompFrameInterceptor implements Interceptor +{ + + /** + * Intercepts a packet which is received before it is sent to the channel. + * By default does not do anything and returns true allowing other interceptors perform logic. + * + * @param packet the packet being received + * @param connection the connection the packet was received on + * @return {@code true} to process the next interceptor and handle the packet, + * {@code false} to abort processing of the packet + * @throws ActiveMQException + */ + @Override + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException + { + return true; + } + + /** + * Intercepts a stomp frame sent by a client. + * + * @param stompFrame the stomp frame being received + * @param connection the connection the stomp frame was received on + * @return {@code true} to process the next interceptor and handle the stomp frame, + * {@code false} to abort processing of the stomp frame + */ + public abstract boolean intercept(StompFrame stompFrame, RemotingConnection connection); +} diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java index 1b71987e2e..9fb0362092 100644 --- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java @@ -79,11 +79,14 @@ class StompProtocolManager implements ProtocolManager, NotificationListener private final Set destinations = new ConcurrentHashSet(); + private final List incomingInterceptors; + private final List outgoingInterceptors; + // Static -------------------------------------------------------- // Constructors -------------------------------------------------- - public StompProtocolManager(final ActiveMQServer server, final List interceptors) + public StompProtocolManager(final ActiveMQServer server, final List incomingInterceptors, final List outgoingInterceptors) { this.server = server; this.executor = server.getExecutorFactory().getExecutor(); @@ -94,6 +97,8 @@ class StompProtocolManager implements ProtocolManager, NotificationListener destinations.add(service.getManagementAddress().toString()); service.addNotificationListener(this); } + this.incomingInterceptors = incomingInterceptors; + this.outgoingInterceptors = outgoingInterceptors; } @Override @@ -166,6 +171,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener try { + invokeInterceptors(this.incomingInterceptors, request, conn); conn.handleFrame(request); } finally @@ -201,6 +207,9 @@ class StompProtocolManager implements ProtocolManager, NotificationListener { ActiveMQServerLogger.LOGGER.trace("sent " + frame); } + + invokeInterceptors(this.outgoingInterceptors, frame, connection); + synchronized (connection) { if (connection.isDestroyed()) @@ -504,4 +513,28 @@ class StompProtocolManager implements ProtocolManager, NotificationListener { return server; } + + private void invokeInterceptors(List interceptors, final StompFrame frame, final StompConnection connection) + { + if (interceptors != null && !interceptors.isEmpty()) + { + for (Interceptor interceptor : interceptors) + { + if (interceptor instanceof StompFrameInterceptor) + { + try + { + if (!((StompFrameInterceptor)interceptor).intercept(frame, connection)) + { + break; + } + } + catch (Exception e) + { + ActiveMQServerLogger.LOGGER.error(e); + } + } + } + } + } } diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java index 72e7734110..8cf5e6ebae 100644 --- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java @@ -31,7 +31,7 @@ public class StompProtocolManagerFactory implements ProtocolManagerFactory public ProtocolManager createProtocolManager(final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors) { - return new StompProtocolManager(server, incomingInterceptors); + return new StompProtocolManager(server, incomingInterceptors, outgoingInterceptors); } @Override diff --git a/docs/user-manual/en/intercepting-operations.md b/docs/user-manual/en/intercepting-operations.md index 4258de299f..145aba2d44 100644 --- a/docs/user-manual/en/intercepting-operations.md +++ b/docs/user-manual/en/intercepting-operations.md @@ -20,6 +20,17 @@ public interface Interceptor } ``` +For stomp protocol an interceptor must extend the `StompFrameInterceptor class`: + +``` java +package org.apache.activemq.core.protocol.stomp; + +public abstract class StompFrameInterceptor +{ + public abstract boolean intercept(StompFrame stompFrame, RemotingConnection connection); +} +``` + The returned boolean value is important: - if `true` is returned, the process continues normally diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java index 976f74fed2..8df0879909 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java @@ -20,14 +20,18 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.QueueBrowser; import javax.jms.TextMessage; +import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.activemq.api.core.TransportConfiguration; import org.apache.activemq.api.core.client.ActiveMQClient; import org.apache.activemq.core.config.Configuration; import org.apache.activemq.core.protocol.stomp.Stomp; +import org.apache.activemq.core.protocol.stomp.StompFrame; +import org.apache.activemq.core.protocol.stomp.StompFrameInterceptor; import org.apache.activemq.core.protocol.stomp.StompProtocolManagerFactory; import org.apache.activemq.core.registry.JndiBindingRegistry; import org.apache.activemq.core.remoting.impl.invm.InVMAcceptorFactory; @@ -41,6 +45,7 @@ import org.apache.activemq.jms.server.config.impl.JMSConfigurationImpl; import org.apache.activemq.jms.server.config.impl.JMSQueueConfigurationImpl; import org.apache.activemq.jms.server.config.impl.TopicConfigurationImpl; import org.apache.activemq.jms.server.impl.JMSServerManagerImpl; +import org.apache.activemq.spi.core.protocol.RemotingConnection; import org.apache.activemq.tests.integration.largemessage.LargeMessageTestBase; import org.apache.activemq.tests.integration.largemessage.LargeMessageTestBase.TestLargeMessageInputStream; import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame; @@ -815,4 +820,137 @@ public class ExtraStompTest extends StompTestBase return server; } + static List incomingInterceptedFrames = new ArrayList(); + static List outgoingInterceptedFrames = new ArrayList(); + + public static class MyIncomingStompFrameInterceptor extends StompFrameInterceptor + { + @Override + public boolean intercept(StompFrame stompFrame, RemotingConnection connection) + { + incomingInterceptedFrames.add(stompFrame); + stompFrame.addHeader("incomingInterceptedProp", "incomingInterceptedVal"); + return true; + } + } + + public static class MyOutgoingStompFrameInterceptor extends StompFrameInterceptor + { + @Override + public boolean intercept(StompFrame stompFrame, RemotingConnection connection) + { + outgoingInterceptedFrames.add(stompFrame); + stompFrame.addHeader("outgoingInterceptedProp", "outgoingInterceptedVal"); + return true; + } + } + + @Test + public void stompFrameInterceptor() throws Exception + { + try + { + List incomingInterceptorList = new ArrayList(); + incomingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyIncomingStompFrameInterceptor"); + List outgoingInterceptorList = new ArrayList(); + outgoingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyOutgoingStompFrameInterceptor"); + + server = createServerWithStompInterceptor(incomingInterceptorList, outgoingInterceptorList); + server.start(); + + setUpAfterServer(); + + String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + sendFrame(frame); + + frame = receiveFrame(100000); + + frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL; + sendFrame(frame); + + sendMessage(getName()); + + receiveFrame(10000); + + frame = "SEND\n" + "destination:" + + getQueuePrefix() + + getQueueName() + + "\n\n" + + "Hello World" + + Stomp.NULL; + sendFrame(frame); + + receiveFrame(10000); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + + sendFrame(frame); + + } + finally + { + cleanUp(); + server.stop(); + } + + List incomingCommands = new ArrayList(4); + incomingCommands.add("CONNECT"); + incomingCommands.add("SUBSCRIBE"); + incomingCommands.add("SEND"); + incomingCommands.add("DISCONNECT"); + + List outgoingCommands = new ArrayList(3); + outgoingCommands.add("CONNECTED"); + outgoingCommands.add("MESSAGE"); + outgoingCommands.add("MESSAGE"); + + Assert.assertEquals(4, incomingInterceptedFrames.size()); + Assert.assertEquals(3, outgoingInterceptedFrames.size()); + + for (int i = 0; i < incomingInterceptedFrames.size(); i++) + { + Assert.assertEquals(incomingCommands.get(i), incomingInterceptedFrames.get(i).getCommand()); + Assert.assertEquals("incomingInterceptedVal", incomingInterceptedFrames.get(i).getHeader("incomingInterceptedProp")); + } + + for (int i = 0; i < outgoingInterceptedFrames.size(); i++) + { + Assert.assertEquals(outgoingCommands.get(i), outgoingInterceptedFrames.get(i).getCommand()); + } + + Assert.assertEquals("incomingInterceptedVal", outgoingInterceptedFrames.get(2).getHeader("incomingInterceptedProp")); + Assert.assertEquals("outgoingInterceptedVal", outgoingInterceptedFrames.get(2).getHeader("outgoingInterceptedProp")); + } + + protected JMSServerManager createServerWithStompInterceptor(List stompIncomingInterceptor, List stompOutgoingInterceptor) throws Exception + { + + Map params = new HashMap(); + params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME); + params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT); + params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1"); + TransportConfiguration stompTransport = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); + + Configuration config = createBasicConfig() + .setPersistenceEnabled(false) + .addAcceptorConfiguration(stompTransport) + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)) + .setIncomingInterceptorClassNames(stompIncomingInterceptor) + .setOutgoingInterceptorClassNames(stompOutgoingInterceptor); + + ActiveMQServer hornetQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass)); + + JMSConfiguration jmsConfig = new JMSConfigurationImpl(); + jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl() + .setName(getQueueName()) + .setDurable(false) + .setBindings(getQueueName())); + jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl() + .setName(getTopicName()) + .setBindings(getTopicName())); + server = new JMSServerManagerImpl(hornetQServer, jmsConfig); + server.setRegistry(new JndiBindingRegistry(new InVMNamingContext())); + return server; + } + }