From b2524b1be49198b1a96b78e70d7777b5214b8a8a Mon Sep 17 00:00:00 2001 From: nberdikov Date: Fri, 13 Mar 2015 10:29:33 +0100 Subject: [PATCH 1/3] ACTIVEMQ6-89 Added possibility to intercept stomp frames https://issues.apache.org/jira/browse/ACTIVEMQ6-89 This was originally contributed at #182. We have squashed the commits and rebased them here This closes #182 --- .../protocol/stomp/StompFrameInterceptor.java | 58 ++++++++ .../protocol/stomp/StompProtocolManager.java | 35 ++++- .../stomp/StompProtocolManagerFactory.java | 2 +- .../user-manual/en/intercepting-operations.md | 11 ++ .../integration/stomp/ExtraStompTest.java | 138 ++++++++++++++++++ 5 files changed, 242 insertions(+), 2 deletions(-) create mode 100644 activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java new file mode 100644 index 0000000000..9af6e2772f --- /dev/null +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.core.protocol.stomp; + +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.Interceptor; +import org.apache.activemq.core.protocol.core.Packet; +import org.apache.activemq.spi.core.protocol.RemotingConnection; + +/** + * This class is a simple way to intercepting client calls on ActiveMQ using STOMP protocol. + *

+ * To add an interceptor to ActiveMQ server, you have to modify the server configuration file + * {@literal activemq-configuration.xml}.
+ */ +public abstract class StompFrameInterceptor implements Interceptor +{ + + /** + * Intercepts a packet which is received before it is sent to the channel. + * By default does not do anything and returns true allowing other interceptors perform logic. + * + * @param packet the packet being received + * @param connection the connection the packet was received on + * @return {@code true} to process the next interceptor and handle the packet, + * {@code false} to abort processing of the packet + * @throws ActiveMQException + */ + @Override + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException + { + return true; + } + + /** + * Intercepts a stomp frame sent by a client. + * + * @param stompFrame the stomp frame being received + * @param connection the connection the stomp frame was received on + * @return {@code true} to process the next interceptor and handle the stomp frame, + * {@code false} to abort processing of the stomp frame + */ + public abstract boolean intercept(StompFrame stompFrame, RemotingConnection connection); +} diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java index 1b71987e2e..9fb0362092 100644 --- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java @@ -79,11 +79,14 @@ class StompProtocolManager implements ProtocolManager, NotificationListener private final Set destinations = new ConcurrentHashSet(); + private final List incomingInterceptors; + private final List outgoingInterceptors; + // Static -------------------------------------------------------- // Constructors -------------------------------------------------- - public StompProtocolManager(final ActiveMQServer server, final List interceptors) + public StompProtocolManager(final ActiveMQServer server, final List incomingInterceptors, final List outgoingInterceptors) { this.server = server; this.executor = server.getExecutorFactory().getExecutor(); @@ -94,6 +97,8 @@ class StompProtocolManager implements ProtocolManager, NotificationListener destinations.add(service.getManagementAddress().toString()); service.addNotificationListener(this); } + this.incomingInterceptors = incomingInterceptors; + this.outgoingInterceptors = outgoingInterceptors; } @Override @@ -166,6 +171,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener try { + invokeInterceptors(this.incomingInterceptors, request, conn); conn.handleFrame(request); } finally @@ -201,6 +207,9 @@ class StompProtocolManager implements ProtocolManager, NotificationListener { ActiveMQServerLogger.LOGGER.trace("sent " + frame); } + + invokeInterceptors(this.outgoingInterceptors, frame, connection); + synchronized (connection) { if (connection.isDestroyed()) @@ -504,4 +513,28 @@ class StompProtocolManager implements ProtocolManager, NotificationListener { return server; } + + private void invokeInterceptors(List interceptors, final StompFrame frame, final StompConnection connection) + { + if (interceptors != null && !interceptors.isEmpty()) + { + for (Interceptor interceptor : interceptors) + { + if (interceptor instanceof StompFrameInterceptor) + { + try + { + if (!((StompFrameInterceptor)interceptor).intercept(frame, connection)) + { + break; + } + } + catch (Exception e) + { + ActiveMQServerLogger.LOGGER.error(e); + } + } + } + } + } } diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java index 72e7734110..8cf5e6ebae 100644 --- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java @@ -31,7 +31,7 @@ public class StompProtocolManagerFactory implements ProtocolManagerFactory public ProtocolManager createProtocolManager(final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors) { - return new StompProtocolManager(server, incomingInterceptors); + return new StompProtocolManager(server, incomingInterceptors, outgoingInterceptors); } @Override diff --git a/docs/user-manual/en/intercepting-operations.md b/docs/user-manual/en/intercepting-operations.md index 4258de299f..145aba2d44 100644 --- a/docs/user-manual/en/intercepting-operations.md +++ b/docs/user-manual/en/intercepting-operations.md @@ -20,6 +20,17 @@ public interface Interceptor } ``` +For stomp protocol an interceptor must extend the `StompFrameInterceptor class`: + +``` java +package org.apache.activemq.core.protocol.stomp; + +public abstract class StompFrameInterceptor +{ + public abstract boolean intercept(StompFrame stompFrame, RemotingConnection connection); +} +``` + The returned boolean value is important: - if `true` is returned, the process continues normally diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java index 976f74fed2..8df0879909 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java @@ -20,14 +20,18 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.QueueBrowser; import javax.jms.TextMessage; +import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.activemq.api.core.TransportConfiguration; import org.apache.activemq.api.core.client.ActiveMQClient; import org.apache.activemq.core.config.Configuration; import org.apache.activemq.core.protocol.stomp.Stomp; +import org.apache.activemq.core.protocol.stomp.StompFrame; +import org.apache.activemq.core.protocol.stomp.StompFrameInterceptor; import org.apache.activemq.core.protocol.stomp.StompProtocolManagerFactory; import org.apache.activemq.core.registry.JndiBindingRegistry; import org.apache.activemq.core.remoting.impl.invm.InVMAcceptorFactory; @@ -41,6 +45,7 @@ import org.apache.activemq.jms.server.config.impl.JMSConfigurationImpl; import org.apache.activemq.jms.server.config.impl.JMSQueueConfigurationImpl; import org.apache.activemq.jms.server.config.impl.TopicConfigurationImpl; import org.apache.activemq.jms.server.impl.JMSServerManagerImpl; +import org.apache.activemq.spi.core.protocol.RemotingConnection; import org.apache.activemq.tests.integration.largemessage.LargeMessageTestBase; import org.apache.activemq.tests.integration.largemessage.LargeMessageTestBase.TestLargeMessageInputStream; import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame; @@ -815,4 +820,137 @@ public class ExtraStompTest extends StompTestBase return server; } + static List incomingInterceptedFrames = new ArrayList(); + static List outgoingInterceptedFrames = new ArrayList(); + + public static class MyIncomingStompFrameInterceptor extends StompFrameInterceptor + { + @Override + public boolean intercept(StompFrame stompFrame, RemotingConnection connection) + { + incomingInterceptedFrames.add(stompFrame); + stompFrame.addHeader("incomingInterceptedProp", "incomingInterceptedVal"); + return true; + } + } + + public static class MyOutgoingStompFrameInterceptor extends StompFrameInterceptor + { + @Override + public boolean intercept(StompFrame stompFrame, RemotingConnection connection) + { + outgoingInterceptedFrames.add(stompFrame); + stompFrame.addHeader("outgoingInterceptedProp", "outgoingInterceptedVal"); + return true; + } + } + + @Test + public void stompFrameInterceptor() throws Exception + { + try + { + List incomingInterceptorList = new ArrayList(); + incomingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyIncomingStompFrameInterceptor"); + List outgoingInterceptorList = new ArrayList(); + outgoingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyOutgoingStompFrameInterceptor"); + + server = createServerWithStompInterceptor(incomingInterceptorList, outgoingInterceptorList); + server.start(); + + setUpAfterServer(); + + String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + sendFrame(frame); + + frame = receiveFrame(100000); + + frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL; + sendFrame(frame); + + sendMessage(getName()); + + receiveFrame(10000); + + frame = "SEND\n" + "destination:" + + getQueuePrefix() + + getQueueName() + + "\n\n" + + "Hello World" + + Stomp.NULL; + sendFrame(frame); + + receiveFrame(10000); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + + sendFrame(frame); + + } + finally + { + cleanUp(); + server.stop(); + } + + List incomingCommands = new ArrayList(4); + incomingCommands.add("CONNECT"); + incomingCommands.add("SUBSCRIBE"); + incomingCommands.add("SEND"); + incomingCommands.add("DISCONNECT"); + + List outgoingCommands = new ArrayList(3); + outgoingCommands.add("CONNECTED"); + outgoingCommands.add("MESSAGE"); + outgoingCommands.add("MESSAGE"); + + Assert.assertEquals(4, incomingInterceptedFrames.size()); + Assert.assertEquals(3, outgoingInterceptedFrames.size()); + + for (int i = 0; i < incomingInterceptedFrames.size(); i++) + { + Assert.assertEquals(incomingCommands.get(i), incomingInterceptedFrames.get(i).getCommand()); + Assert.assertEquals("incomingInterceptedVal", incomingInterceptedFrames.get(i).getHeader("incomingInterceptedProp")); + } + + for (int i = 0; i < outgoingInterceptedFrames.size(); i++) + { + Assert.assertEquals(outgoingCommands.get(i), outgoingInterceptedFrames.get(i).getCommand()); + } + + Assert.assertEquals("incomingInterceptedVal", outgoingInterceptedFrames.get(2).getHeader("incomingInterceptedProp")); + Assert.assertEquals("outgoingInterceptedVal", outgoingInterceptedFrames.get(2).getHeader("outgoingInterceptedProp")); + } + + protected JMSServerManager createServerWithStompInterceptor(List stompIncomingInterceptor, List stompOutgoingInterceptor) throws Exception + { + + Map params = new HashMap(); + params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME); + params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT); + params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1"); + TransportConfiguration stompTransport = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); + + Configuration config = createBasicConfig() + .setPersistenceEnabled(false) + .addAcceptorConfiguration(stompTransport) + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)) + .setIncomingInterceptorClassNames(stompIncomingInterceptor) + .setOutgoingInterceptorClassNames(stompOutgoingInterceptor); + + ActiveMQServer hornetQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass)); + + JMSConfiguration jmsConfig = new JMSConfigurationImpl(); + jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl() + .setName(getQueueName()) + .setDurable(false) + .setBindings(getQueueName())); + jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl() + .setName(getTopicName()) + .setBindings(getTopicName())); + server = new JMSServerManagerImpl(hornetQServer, jmsConfig); + server.setRegistry(new JndiBindingRegistry(new InVMNamingContext())); + return server; + } + } From 519a47f023ff24a474160dd6e61c2809dbdd5764 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 16 Mar 2015 15:11:25 -0400 Subject: [PATCH 2/3] ACTIVEMQ6-89 Refactored stomp support contributed https://issues.apache.org/jira/browse/ACTIVEMQ6-89 I have done a lot of refactoring on this. So we can a different version of the interceptor for each protocol based on a base class now. Just an abstract class over Stomp would be a bit hacky... this is a better approach. --- .../activemq/api/core/BaseInterceptor.java | 35 +++++ .../apache/activemq/api/core/Interceptor.java | 13 +- .../proton/ProtonProtocolManager.java | 26 +++- .../proton/ProtonProtocolManagerFactory.java | 15 +- .../openwire/OpenWireProtocolManager.java | 141 ++++++++++-------- .../OpenWireProtocolManagerFactory.java | 14 +- .../protocol/stomp/StompFrameInterceptor.java | 33 +--- .../protocol/stomp/StompProtocolManager.java | 56 ++++--- .../stomp/StompProtocolManagerFactory.java | 16 +- .../core/impl/CoreProtocolManager.java | 30 +++- .../core/impl/CoreProtocolManagerFactory.java | 23 ++- .../server/impl/RemotingServiceImpl.java | 55 +++++-- .../AbstractProtocolManagerFactory.java | 57 +++++++ .../spi/core/protocol/ProtocolManager.java | 16 +- .../core/protocol/ProtocolManagerFactory.java | 22 ++- .../user-manual/en/intercepting-operations.md | 4 +- .../integration/stomp/ExtraStompTest.java | 98 ++++++++---- .../integration/stomp/StompTestBase.java | 2 +- 18 files changed, 463 insertions(+), 193 deletions(-) create mode 100644 activemq-core-client/src/main/java/org/apache/activemq/api/core/BaseInterceptor.java create mode 100644 activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/AbstractProtocolManagerFactory.java diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/BaseInterceptor.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/BaseInterceptor.java new file mode 100644 index 0000000000..57c788b368 --- /dev/null +++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/BaseInterceptor.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.api.core; + +import org.apache.activemq.spi.core.protocol.RemotingConnection; + +public interface BaseInterceptor

+{ + /** + * Intercepts a packet which is received before it is sent to the channel + * + * @param packet the packet being received + * @param connection the connection the packet was received on + * @return {@code true} to process the next interceptor and handle the packet, + * {@code false} to abort processing of the packet + * @throws ActiveMQException + */ + boolean intercept(P packet, RemotingConnection connection) throws ActiveMQException; + +} diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/Interceptor.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/Interceptor.java index a1fa94a4ad..7cfee0b58d 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/Interceptor.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/Interceptor.java @@ -17,7 +17,6 @@ package org.apache.activemq.api.core; import org.apache.activemq.core.protocol.core.Packet; -import org.apache.activemq.spi.core.protocol.RemotingConnection; /** * This is class is a simple way to intercepting calls on ActiveMQ client and servers. @@ -26,16 +25,6 @@ import org.apache.activemq.spi.core.protocol.RemotingConnection; * {@literal activemq-configuration.xml}.
* To add it to a client, use {@link org.apache.activemq.api.core.client.ServerLocator#addIncomingInterceptor(Interceptor)} */ -public interface Interceptor +public interface Interceptor extends BaseInterceptor { - /** - * Intercepts a packet which is received before it is sent to the channel - * - * @param packet the packet being received - * @param connection the connection the packet was received on - * @return {@code true} to process the next interceptor and handle the packet, - * {@code false} to abort processing of the packet - * @throws ActiveMQException - */ - boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException; } diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java index b3f3211153..dd88cab598 100644 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManager.java @@ -16,10 +16,13 @@ */ package org.apache.activemq.core.protocol.proton; +import java.util.List; import java.util.concurrent.Executor; import io.netty.channel.ChannelPipeline; import org.apache.activemq.api.core.ActiveMQBuffer; +import org.apache.activemq.api.core.BaseInterceptor; +import org.apache.activemq.api.core.Interceptor; import org.apache.activemq.api.core.client.ActiveMQClient; import org.apache.activemq.core.protocol.proton.converter.ProtonMessageConverter; import org.apache.activemq.core.protocol.proton.plug.ActiveMQProtonConnectionCallback; @@ -30,6 +33,7 @@ import org.apache.activemq.core.server.management.NotificationListener; import org.apache.activemq.spi.core.protocol.ConnectionEntry; import org.apache.activemq.spi.core.protocol.MessageConverter; import org.apache.activemq.spi.core.protocol.ProtocolManager; +import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.spi.core.protocol.RemotingConnection; import org.apache.activemq.spi.core.remoting.Acceptor; import org.apache.activemq.spi.core.remoting.Connection; @@ -39,14 +43,18 @@ import org.proton.plug.context.server.ProtonServerConnectionContextFactory; /** * A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ resources */ -public class ProtonProtocolManager implements ProtocolManager, NotificationListener +public class ProtonProtocolManager implements ProtocolManager, NotificationListener { private final ActiveMQServer server; private MessageConverter protonConverter; - public ProtonProtocolManager(ActiveMQServer server) + + private final ProtonProtocolManagerFactory factory; + + public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) { + this.factory = factory; this.server = server; this.protonConverter = new ProtonMessageConverter(server.getStorageManager()); } @@ -69,6 +77,18 @@ public class ProtonProtocolManager implements ProtocolManager, NotificationListe } + @Override + public ProtocolManagerFactory getFactory() + { + return factory; + } + + @Override + public void updateInterceptors(List incomingInterceptors, List outgoingInterceptors) + { + // no op + } + @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) { @@ -97,7 +117,7 @@ public class ProtonProtocolManager implements ProtocolManager, NotificationListe @Override public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer) { - ActiveMQProtonRemotingConnection protonConnection = (ActiveMQProtonRemotingConnection)connection; + ActiveMQProtonRemotingConnection protonConnection = (ActiveMQProtonRemotingConnection) connection; protonConnection.bufferReceived(protonConnection.getID(), buffer); } diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java index c83f10cd3e..6fe25765ca 100644 --- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java +++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/ProtonProtocolManagerFactory.java @@ -16,14 +16,16 @@ */ package org.apache.activemq.core.protocol.proton; +import org.apache.activemq.api.core.BaseInterceptor; import org.apache.activemq.api.core.Interceptor; import org.apache.activemq.core.server.ActiveMQServer; +import org.apache.activemq.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.spi.core.protocol.ProtocolManager; -import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory; +import java.util.Collections; import java.util.List; -public class ProtonProtocolManagerFactory implements ProtocolManagerFactory +public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory { private static final String AMQP_PROTOCOL_NAME = "AMQP"; @@ -32,7 +34,14 @@ public class ProtonProtocolManagerFactory implements ProtocolManagerFactory @Override public ProtocolManager createProtocolManager(ActiveMQServer server, List incomingInterceptors, List outgoingInterceptors) { - return new ProtonProtocolManager(server); + return new ProtonProtocolManager(this, server); + } + + @Override + public List filterInterceptors(List interceptors) + { + // no interceptors on Proton + return Collections.emptyList(); } @Override diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java index dfa4e4a8fa..74d7d77487 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.core.protocol.openwire; +import javax.jms.InvalidClientIDException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -27,12 +28,12 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import javax.jms.InvalidClientIDException; - import io.netty.channel.ChannelPipeline; - import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.api.core.ActiveMQBuffer; +import org.apache.activemq.api.core.BaseInterceptor; +import org.apache.activemq.api.core.Interceptor; +import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; @@ -54,16 +55,6 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.XATransactionId; -import org.apache.activemq.core.server.ActiveMQServerLogger; -import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.openwire.OpenWireFormatFactory; -import org.apache.activemq.state.ConnectionState; -import org.apache.activemq.state.ProducerState; -import org.apache.activemq.state.SessionState; -import org.apache.activemq.util.IdGenerator; -import org.apache.activemq.util.InetAddressUtil; -import org.apache.activemq.util.LongSequenceGenerator; -import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.core.journal.IOAsyncTask; import org.apache.activemq.core.protocol.openwire.amq.AMQConnectionContext; import org.apache.activemq.core.protocol.openwire.amq.AMQPersistenceAdapter; @@ -74,16 +65,26 @@ import org.apache.activemq.core.protocol.openwire.amq.AMQTransportConnectionStat import org.apache.activemq.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.core.security.CheckType; import org.apache.activemq.core.server.ActiveMQServer; +import org.apache.activemq.core.server.ActiveMQServerLogger; import org.apache.activemq.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.openwire.OpenWireFormatFactory; import org.apache.activemq.spi.core.protocol.ConnectionEntry; import org.apache.activemq.spi.core.protocol.MessageConverter; import org.apache.activemq.spi.core.protocol.ProtocolManager; +import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.spi.core.protocol.RemotingConnection; import org.apache.activemq.spi.core.remoting.Acceptor; import org.apache.activemq.spi.core.remoting.Connection; import org.apache.activemq.spi.core.security.ActiveMQSecurityManager; +import org.apache.activemq.state.ConnectionState; +import org.apache.activemq.state.ProducerState; +import org.apache.activemq.state.SessionState; +import org.apache.activemq.util.IdGenerator; +import org.apache.activemq.util.InetAddressUtil; +import org.apache.activemq.util.LongSequenceGenerator; -public class OpenWireProtocolManager implements ProtocolManager +public class OpenWireProtocolManager implements ProtocolManager { private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator(); private static final IdGenerator ID_GENERATOR = new IdGenerator(); @@ -91,6 +92,8 @@ public class OpenWireProtocolManager implements ProtocolManager private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final ActiveMQServer server; + private final OpenWireProtocolManagerFactory factory; + private OpenWireFormatFactory wireFactory; private boolean tightEncodingEnabled = true; @@ -104,7 +107,7 @@ public class OpenWireProtocolManager implements ProtocolManager // from broker protected final Map brokerConnectionStates = Collections - .synchronizedMap(new HashMap()); + .synchronizedMap(new HashMap()); private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); @@ -118,8 +121,9 @@ public class OpenWireProtocolManager implements ProtocolManager private Map transactions = new ConcurrentHashMap(); - public OpenWireProtocolManager(ActiveMQServer server) + public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) { + this.factory = factory; this.server = server; this.wireFactory = new OpenWireFormatFactory(); // preferred prop, should be done via config @@ -128,17 +132,30 @@ public class OpenWireProtocolManager implements ProtocolManager advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); } + + public ProtocolManagerFactory getFactory() + { + return factory; + } + + + @Override + public void updateInterceptors(List incomingInterceptors, List outgoingInterceptors) + { + // NO-OP + } + @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, - Connection connection) + Connection connection) { OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat(); OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, - connection, this, wf); + connection, this, wf); owConn.init(); return new ConnectionEntry(owConn, null, System.currentTimeMillis(), - 1 * 60 * 1000); + 1 * 60 * 1000); } @Override @@ -171,7 +188,7 @@ public class OpenWireProtocolManager implements ProtocolManager if (array.length < 8) { throw new IllegalArgumentException("Protocol header length changed " - + array.length); + + array.length); } int start = this.prefixPacketSize ? 4 : 0; @@ -207,7 +224,7 @@ public class OpenWireProtocolManager implements ProtocolManager } public void handleCommand(OpenWireConnection openWireConnection, - Object command) + Object command) { Command amqCmd = (Command) command; byte type = amqCmd.getDataStructureType(); @@ -221,14 +238,14 @@ public class OpenWireProtocolManager implements ProtocolManager } public void sendReply(final OpenWireConnection connection, - final Command command) + final Command command) { server.getStorageManager().afterCompleteOperations(new IOAsyncTask() { public void onError(final int errorCode, final String errorMessage) { ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, - errorMessage); + errorMessage); } public void done() @@ -285,7 +302,7 @@ public class OpenWireProtocolManager implements ProtocolManager if (clientId == null) { throw new InvalidClientIDException( - "No clientID specified for connection request"); + "No clientID specified for connection request"); } synchronized (clientIdSet) { @@ -308,8 +325,8 @@ public class OpenWireProtocolManager implements ProtocolManager else { throw new InvalidClientIDException("Broker: " + getBrokerName() - + " - Client: " + clientId + " already connected from " - + oldContext.getConnection().getRemoteAddress()); + + " - Client: " + clientId + " already connected from " + + oldContext.getConnection().getRemoteAddress()); } } else @@ -329,11 +346,11 @@ public class OpenWireProtocolManager implements ProtocolManager // init the conn addSessions(context.getConnection(), context.getConnectionState() - .getSessionIds()); + .getSessionIds()); } private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, - Command copy) throws Exception + Command copy) throws Exception { this.fireAdvisory(context, topic, copy, null); } @@ -351,26 +368,26 @@ public class OpenWireProtocolManager implements ProtocolManager * See AdvisoryBroker.fireAdvisory() */ private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, - Command command, ConsumerId targetConsumerId) throws Exception + Command command, ConsumerId targetConsumerId) throws Exception { ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setStringProperty( - AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName()); + AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName()); String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET"; advisoryMessage.setStringProperty( - AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); + AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); String url = "tcp://localhost:61616"; advisoryMessage.setStringProperty( - AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); + AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); // set the data structure advisoryMessage.setDataStructure(command); advisoryMessage.setPersistent(false); advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); advisoryMessage.setMessageId(new MessageId(advisoryProducerId, - messageIdGenerator.getNextSequenceId())); + messageIdGenerator.getNextSequenceId())); advisoryMessage.setTargetConsumerId(targetConsumerId); advisoryMessage.setDestination(topic); advisoryMessage.setResponseRequired(false); @@ -402,7 +419,7 @@ public class OpenWireProtocolManager implements ProtocolManager try { brokerName = InetAddressUtil.getLocalHostName().toLowerCase( - Locale.ENGLISH); + Locale.ENGLISH); } catch (Exception e) { @@ -445,34 +462,34 @@ public class OpenWireProtocolManager implements ProtocolManager SessionId sessionId = info.getProducerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); AMQTransportConnectionState cs = theConn - .lookupConnectionState(connectionId); + .lookupConnectionState(connectionId); if (cs == null) { throw new IllegalStateException( - "Cannot add a producer to a connection that had not been registered: " - + connectionId); + "Cannot add a producer to a connection that had not been registered: " + + connectionId); } SessionState ss = cs.getSessionState(sessionId); if (ss == null) { throw new IllegalStateException( - "Cannot add a producer to a session that had not been registered: " - + sessionId); + "Cannot add a producer to a session that had not been registered: " + + sessionId); } // Avoid replaying dup commands if (!ss.getProducerIds().contains(info.getProducerId())) { ActiveMQDestination destination = info.getDestination(); if (destination != null - && !AdvisorySupport.isAdvisoryTopic(destination)) + && !AdvisorySupport.isAdvisoryTopic(destination)) { if (theConn.getProducerCount(connectionId) >= theConn - .getMaximumProducersAllowedPerConnection()) + .getMaximumProducersAllowedPerConnection()) { throw new IllegalStateException( - "Can't add producer on connection " + connectionId - + ": at maximum limit: " - + theConn.getMaximumProducersAllowedPerConnection()); + "Can't add producer on connection " + connectionId + + ": at maximum limit: " + + theConn.getMaximumProducersAllowedPerConnection()); } } @@ -503,35 +520,35 @@ public class OpenWireProtocolManager implements ProtocolManager SessionId sessionId = info.getConsumerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); AMQTransportConnectionState cs = theConn - .lookupConnectionState(connectionId); + .lookupConnectionState(connectionId); if (cs == null) { throw new IllegalStateException( - "Cannot add a consumer to a connection that had not been registered: " - + connectionId); + "Cannot add a consumer to a connection that had not been registered: " + + connectionId); } SessionState ss = cs.getSessionState(sessionId); if (ss == null) { throw new IllegalStateException( - this.server - + " Cannot add a consumer to a session that had not been registered: " - + sessionId); + this.server + + " Cannot add a consumer to a session that had not been registered: " + + sessionId); } // Avoid replaying dup commands if (!ss.getConsumerIds().contains(info.getConsumerId())) { ActiveMQDestination destination = info.getDestination(); if (destination != null - && !AdvisorySupport.isAdvisoryTopic(destination)) + && !AdvisorySupport.isAdvisoryTopic(destination)) { if (theConn.getConsumerCount(connectionId) >= theConn - .getMaximumConsumersAllowedPerConnection()) + .getMaximumConsumersAllowedPerConnection()) { throw new IllegalStateException( - "Can't add consumer on connection " + connectionId - + ": at maximum limit: " - + theConn.getMaximumConsumersAllowedPerConnection()); + "Can't add consumer on connection " + connectionId + + ": at maximum limit: " + + theConn.getMaximumConsumersAllowedPerConnection()); } } @@ -562,7 +579,7 @@ public class OpenWireProtocolManager implements ProtocolManager { SessionId sid = iter.next(); addSession(theConn, theConn.getState().getSessionState(sid).getInfo(), - true); + true); } } @@ -572,10 +589,10 @@ public class OpenWireProtocolManager implements ProtocolManager } public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss, - boolean internal) + boolean internal) { AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss, - server, theConn, this); + server, theConn, this); amqSession.initialize(); amqSession.setInternal(internal); sessions.put(ss.getSessionId(), amqSession); @@ -583,7 +600,7 @@ public class OpenWireProtocolManager implements ProtocolManager } public void removeConnection(AMQConnectionContext context, - ConnectionInfo info, Throwable error) + ConnectionInfo info, Throwable error) { // todo roll back tx this.connections.remove(context.getConnection()); @@ -630,13 +647,13 @@ public class OpenWireProtocolManager implements ProtocolManager } public void addDestination(OpenWireConnection connection, - DestinationInfo info) throws Exception + DestinationInfo info) throws Exception { ActiveMQDestination dest = info.getDestination(); if (dest.isQueue()) { SimpleString qName = new SimpleString("jms.queue." - + dest.getPhysicalName()); + + dest.getPhysicalName()); ConnectionState state = connection.brokerConnectionStates.get(info.getConnectionId()); ConnectionInfo connInfo = state.getInfo(); if (connInfo != null) @@ -646,7 +663,7 @@ public class OpenWireProtocolManager implements ProtocolManager AMQServerSession fakeSession = new AMQServerSession(user, pass); CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE; - ((ActiveMQServerImpl)server).getSecurityStore().check(qName, checkType, fakeSession); + ((ActiveMQServerImpl) server).getSecurityStore().check(qName, checkType, fakeSession); } this.server.createQueue(qName, qName, null, false, true); if (dest.isTemporary()) diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java index 42f8f4d816..5593ee6200 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManagerFactory.java @@ -16,14 +16,16 @@ */ package org.apache.activemq.core.protocol.openwire; +import java.util.Collections; import java.util.List; +import org.apache.activemq.api.core.BaseInterceptor; import org.apache.activemq.api.core.Interceptor; import org.apache.activemq.core.server.ActiveMQServer; +import org.apache.activemq.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.spi.core.protocol.ProtocolManager; -import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory; -public class OpenWireProtocolManagerFactory implements ProtocolManagerFactory +public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFactory { public static final String OPENWIRE_PROTOCOL_NAME = "OPENWIRE"; @@ -31,7 +33,13 @@ public class OpenWireProtocolManagerFactory implements ProtocolManagerFactory public ProtocolManager createProtocolManager(final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors) { - return new OpenWireProtocolManager(server); + return new OpenWireProtocolManager(this, server); + } + + @Override + public List filterInterceptors(List interceptors) + { + return Collections.emptyList(); } @Override diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java index 9af6e2772f..619c29fe67 100644 --- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompFrameInterceptor.java @@ -16,10 +16,7 @@ */ package org.apache.activemq.core.protocol.stomp; -import org.apache.activemq.api.core.ActiveMQException; -import org.apache.activemq.api.core.Interceptor; -import org.apache.activemq.core.protocol.core.Packet; -import org.apache.activemq.spi.core.protocol.RemotingConnection; +import org.apache.activemq.api.core.BaseInterceptor; /** * This class is a simple way to intercepting client calls on ActiveMQ using STOMP protocol. @@ -27,32 +24,6 @@ import org.apache.activemq.spi.core.protocol.RemotingConnection; * To add an interceptor to ActiveMQ server, you have to modify the server configuration file * {@literal activemq-configuration.xml}.
*/ -public abstract class StompFrameInterceptor implements Interceptor +public interface StompFrameInterceptor extends BaseInterceptor { - - /** - * Intercepts a packet which is received before it is sent to the channel. - * By default does not do anything and returns true allowing other interceptors perform logic. - * - * @param packet the packet being received - * @param connection the connection the packet was received on - * @return {@code true} to process the next interceptor and handle the packet, - * {@code false} to abort processing of the packet - * @throws ActiveMQException - */ - @Override - public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException - { - return true; - } - - /** - * Intercepts a stomp frame sent by a client. - * - * @param stompFrame the stomp frame being received - * @param connection the connection the stomp frame was received on - * @return {@code true} to process the next interceptor and handle the stomp frame, - * {@code false} to abort processing of the stomp frame - */ - public abstract boolean intercept(StompFrame stompFrame, RemotingConnection connection); } diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java index 9fb0362092..3780180faf 100644 --- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java @@ -26,10 +26,9 @@ import java.util.Set; import java.util.concurrent.Executor; import io.netty.channel.ChannelPipeline; - import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQExceptionType; -import org.apache.activemq.api.core.Interceptor; +import org.apache.activemq.api.core.BaseInterceptor; import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.api.core.client.ActiveMQClient; import org.apache.activemq.api.core.management.CoreNotificationType; @@ -49,6 +48,7 @@ import org.apache.activemq.core.server.management.NotificationListener; import org.apache.activemq.spi.core.protocol.ConnectionEntry; import org.apache.activemq.spi.core.protocol.MessageConverter; import org.apache.activemq.spi.core.protocol.ProtocolManager; +import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.spi.core.protocol.RemotingConnection; import org.apache.activemq.spi.core.remoting.Acceptor; import org.apache.activemq.spi.core.remoting.Connection; @@ -62,7 +62,7 @@ import static org.apache.activemq.core.protocol.stomp.ActiveMQStompProtocolMessa /** * StompProtocolManager */ -class StompProtocolManager implements ProtocolManager, NotificationListener +class StompProtocolManager implements ProtocolManager, NotificationListener { // Constants ----------------------------------------------------- @@ -70,6 +70,8 @@ class StompProtocolManager implements ProtocolManager, NotificationListener private final ActiveMQServer server; + private final StompProtocolManagerFactory factory; + private final Executor executor; private final Map transactedSessions = new HashMap(); @@ -79,15 +81,16 @@ class StompProtocolManager implements ProtocolManager, NotificationListener private final Set destinations = new ConcurrentHashSet(); - private final List incomingInterceptors; - private final List outgoingInterceptors; + private final List incomingInterceptors; + private final List outgoingInterceptors; // Static -------------------------------------------------------- // Constructors -------------------------------------------------- - public StompProtocolManager(final ActiveMQServer server, final List incomingInterceptors, final List outgoingInterceptors) + public StompProtocolManager(final StompProtocolManagerFactory factory, final ActiveMQServer server, final List incomingInterceptors, final List outgoingInterceptors) { + this.factory = factory; this.server = server; this.executor = server.getExecutorFactory().getExecutor(); ManagementService service = server.getManagementService(); @@ -101,6 +104,22 @@ class StompProtocolManager implements ProtocolManager, NotificationListener this.outgoingInterceptors = outgoingInterceptors; } + @Override + public ProtocolManagerFactory getFactory() + { + return factory; + } + + @Override + public void updateInterceptors(List incoming, List outgoing) + { + this.incomingInterceptors.clear(); + this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming)); + + this.outgoingInterceptors.clear(); + this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing)); + } + @Override public MessageConverter getConverter() { @@ -345,7 +364,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage); ActiveMQStompException e = new ActiveMQStompException("Error sending reply", - ActiveMQExceptionType.createException(errorCode, errorMessage)); + ActiveMQExceptionType.createException(errorCode, errorMessage)); StompFrame error = e.getFrame(); send(connection, error); @@ -419,7 +438,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener if (stompSession.containsSubscription(subscriptionID)) { throw new ActiveMQStompException("There already is a subscription for: " + subscriptionID + - ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination"); + ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination"); } long consumerID = server.getStorageManager().generateID(); String clientID = (connection.getClientID() != null) ? connection.getClientID() : null; @@ -514,26 +533,23 @@ class StompProtocolManager implements ProtocolManager, NotificationListener return server; } - private void invokeInterceptors(List interceptors, final StompFrame frame, final StompConnection connection) + private void invokeInterceptors(List interceptors, final StompFrame frame, final StompConnection connection) { if (interceptors != null && !interceptors.isEmpty()) { - for (Interceptor interceptor : interceptors) + for (StompFrameInterceptor interceptor : interceptors) { - if (interceptor instanceof StompFrameInterceptor) + try { - try + if (!interceptor.intercept(frame, connection)) { - if (!((StompFrameInterceptor)interceptor).intercept(frame, connection)) - { - break; - } - } - catch (Exception e) - { - ActiveMQServerLogger.LOGGER.error(e); + break; } } + catch (Exception e) + { + ActiveMQServerLogger.LOGGER.error(e); + } } } } diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java index 8cf5e6ebae..1edf561857 100644 --- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManagerFactory.java @@ -18,20 +18,26 @@ package org.apache.activemq.core.protocol.stomp; import java.util.List; -import org.apache.activemq.api.core.Interceptor; +import org.apache.activemq.api.core.BaseInterceptor; import org.apache.activemq.core.server.ActiveMQServer; +import org.apache.activemq.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.spi.core.protocol.ProtocolManager; -import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory; -public class StompProtocolManagerFactory implements ProtocolManagerFactory +public class StompProtocolManagerFactory extends AbstractProtocolManagerFactory { public static final String STOMP_PROTOCOL_NAME = "STOMP"; private static String[] SUPPORTED_PROTOCOLS = {STOMP_PROTOCOL_NAME}; - public ProtocolManager createProtocolManager(final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors) + public ProtocolManager createProtocolManager(final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors) { - return new StompProtocolManager(server, incomingInterceptors, outgoingInterceptors); + return new StompProtocolManager(this, server, incomingInterceptors, outgoingInterceptors); + } + + @Override + public List filterInterceptors(List interceptors) + { + return filterInterceptors(StompFrameInterceptor.class, interceptors); } @Override diff --git a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManager.java b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManager.java index 0785e3a0c2..0768ccf1b7 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManager.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManager.java @@ -25,11 +25,12 @@ import java.util.concurrent.RejectedExecutionException; import io.netty.channel.ChannelPipeline; import org.apache.activemq.api.core.ActiveMQBuffer; +import org.apache.activemq.api.core.BaseInterceptor; import org.apache.activemq.api.core.Interceptor; import org.apache.activemq.api.core.Pair; import org.apache.activemq.api.core.TransportConfiguration; -import org.apache.activemq.api.core.client.ClusterTopologyListener; import org.apache.activemq.api.core.client.ActiveMQClient; +import org.apache.activemq.api.core.client.ClusterTopologyListener; import org.apache.activemq.api.core.client.TopologyMember; import org.apache.activemq.core.config.Configuration; import org.apache.activemq.core.protocol.ServerPacketDecoder; @@ -53,11 +54,12 @@ import org.apache.activemq.core.server.ActiveMQServerLogger; import org.apache.activemq.spi.core.protocol.ConnectionEntry; import org.apache.activemq.spi.core.protocol.MessageConverter; import org.apache.activemq.spi.core.protocol.ProtocolManager; +import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.spi.core.protocol.RemotingConnection; import org.apache.activemq.spi.core.remoting.Acceptor; import org.apache.activemq.spi.core.remoting.Connection; -class CoreProtocolManager implements ProtocolManager +class CoreProtocolManager implements ProtocolManager { private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); @@ -67,8 +69,12 @@ class CoreProtocolManager implements ProtocolManager private final List outgoingInterceptors; - CoreProtocolManager(final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors) + private final CoreProtocolManagerFactory protocolManagerFactory; + + CoreProtocolManager(final CoreProtocolManagerFactory factory, final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors) { + this.protocolManagerFactory = factory; + this.server = server; this.incomingInterceptors = incomingInterceptors; @@ -76,8 +82,26 @@ class CoreProtocolManager implements ProtocolManager this.outgoingInterceptors = outgoingInterceptors; } + + @Override + public ProtocolManagerFactory getFactory() + { + return protocolManagerFactory; + } + + @Override + public void updateInterceptors(List incoming, List outgoing) + { + this.incomingInterceptors.clear(); + this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming)); + + this.outgoingInterceptors.clear(); + this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing)); + } + /** * no need to implement this now + * * @return */ @Override diff --git a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManagerFactory.java b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManagerFactory.java index e9d0ac5fbb..147abe09a6 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManagerFactory.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/CoreProtocolManagerFactory.java @@ -18,19 +18,36 @@ package org.apache.activemq.core.protocol.core.impl; import java.util.List; +import org.apache.activemq.api.core.BaseInterceptor; import org.apache.activemq.api.core.Interceptor; import org.apache.activemq.api.core.client.ActiveMQClient; import org.apache.activemq.core.server.ActiveMQServer; +import org.apache.activemq.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.spi.core.protocol.ProtocolManager; -import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory; -public class CoreProtocolManagerFactory implements ProtocolManagerFactory +public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory { private static String[] SUPPORTED_PROTOCOLS = {ActiveMQClient.DEFAULT_CORE_PROTOCOL}; + /** + * {@inheritDoc} * + * @param server + * @param incomingInterceptors + * @param outgoingInterceptors + * @return + */ public ProtocolManager createProtocolManager(final ActiveMQServer server, final List incomingInterceptors, List outgoingInterceptors) { - return new CoreProtocolManager(server, incomingInterceptors, outgoingInterceptors); + return new CoreProtocolManager(this, server, incomingInterceptors, outgoingInterceptors); + } + + @Override + public List filterInterceptors(List interceptors) + { + // This is using this tool method + // it wouldn't be possible to write a generic method without this class parameter + // and I didn't want to bloat the cllaers for this + return filterInterceptors(Interceptor.class, interceptors); } @Override diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java index 10b01062b1..147b39add7 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQException; import org.apache.activemq.api.core.ActiveMQInterruptedException; +import org.apache.activemq.api.core.BaseInterceptor; import org.apache.activemq.api.core.Interceptor; import org.apache.activemq.api.core.TransportConfiguration; import org.apache.activemq.core.config.Configuration; @@ -53,8 +54,8 @@ import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.core.server.ActiveMQServerLogger; import org.apache.activemq.core.server.cluster.ClusterConnection; import org.apache.activemq.core.server.cluster.ClusterManager; -import org.apache.activemq.core.server.impl.ServiceRegistry; import org.apache.activemq.core.server.impl.ServerSessionImpl; +import org.apache.activemq.core.server.impl.ServiceRegistry; import org.apache.activemq.core.server.management.ManagementService; import org.apache.activemq.spi.core.protocol.ConnectionEntry; import org.apache.activemq.spi.core.protocol.ProtocolManager; @@ -65,9 +66,9 @@ import org.apache.activemq.spi.core.remoting.AcceptorFactory; import org.apache.activemq.spi.core.remoting.BufferHandler; import org.apache.activemq.spi.core.remoting.Connection; import org.apache.activemq.spi.core.remoting.ConnectionLifeCycleListener; +import org.apache.activemq.utils.ActiveMQThreadFactory; import org.apache.activemq.utils.ClassloadingUtil; import org.apache.activemq.utils.ConfigurationHelper; -import org.apache.activemq.utils.ActiveMQThreadFactory; import org.apache.activemq.utils.ReusableLatch; public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycleListener @@ -84,9 +85,9 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle private final Set acceptorsConfig; - private final List incomingInterceptors = new CopyOnWriteArrayList(); + private final List incomingInterceptors = new CopyOnWriteArrayList<>(); - private final List outgoingInterceptors = new CopyOnWriteArrayList(); + private final List outgoingInterceptors = new CopyOnWriteArrayList<>(); private final Map acceptors = new HashMap(); @@ -147,7 +148,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0]); this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], - coreProtocolManagerFactory.createProtocolManager(server, incomingInterceptors, outgoingInterceptors)); + coreProtocolManagerFactory.createProtocolManager(server, coreProtocolManagerFactory.filterInterceptors(incomingInterceptors), + coreProtocolManagerFactory.filterInterceptors(outgoingInterceptors))); if (config.isResolveProtocols()) { @@ -160,7 +162,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle for (String protocol : protocols) { ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol); - protocolMap.put(protocol, next.createProtocolManager(server, incomingInterceptors, outgoingInterceptors)); + protocolMap.put(protocol, next.createProtocolManager(server, next.filterInterceptors(incomingInterceptors), + next.filterInterceptors(outgoingInterceptors))); } } } @@ -190,11 +193,11 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle outgoingInterceptors.addAll(serviceRegistry.getOutgoingInterceptors()); } - private void addReflectivelyInstantiatedInterceptors(List classNames, List interceptors) + private void addReflectivelyInstantiatedInterceptors(List classNames, List interceptors) { for (String className : classNames) { - Interceptor interceptor = ((Interceptor) safeInitNewInstance(className)); + BaseInterceptor interceptor = ((BaseInterceptor) safeInitNewInstance(className)); interceptors.add(interceptor); } } @@ -221,8 +224,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle // to support many hundreds of connections, but the main thread pool must be kept small for better performance ThreadFactory tFactory = new ActiveMQThreadFactory("ActiveMQ-remoting-threads-" + server.toString() + - "-" + - System.identityHashCode(this), false, tccl); + "-" + + System.identityHashCode(this), false, tccl); threadPool = Executors.newCachedThreadPool(tFactory); @@ -620,24 +623,43 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle public void addIncomingInterceptor(final Interceptor interceptor) { incomingInterceptors.add(interceptor); + + updateProtocols(); } @Override public boolean removeIncomingInterceptor(final Interceptor interceptor) { - return incomingInterceptors.remove(interceptor); + if (incomingInterceptors.remove(interceptor)) + { + updateProtocols(); + return true; + } + else + { + return false; + } } @Override public void addOutgoingInterceptor(final Interceptor interceptor) { outgoingInterceptors.add(interceptor); + updateProtocols(); } @Override public boolean removeOutgoingInterceptor(final Interceptor interceptor) { - return outgoingInterceptors.remove(interceptor); + if (outgoingInterceptors.remove(interceptor)) + { + updateProtocols(); + return true; + } + else + { + return false; + } } private ClusterConnection lookupClusterConnection(TransportConfiguration acceptorConfig) @@ -803,4 +825,13 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle }); } + protected void updateProtocols() + { + for (ProtocolManager protocolManager : this.protocolMap.values()) + { + protocolManager.updateInterceptors(incomingInterceptors, outgoingInterceptors); + } + + } + } diff --git a/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/AbstractProtocolManagerFactory.java b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/AbstractProtocolManagerFactory.java new file mode 100644 index 0000000000..df2d555ee9 --- /dev/null +++ b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/AbstractProtocolManagerFactory.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.spi.core.protocol; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.apache.activemq.api.core.BaseInterceptor; + +public abstract class AbstractProtocolManagerFactory

implements ProtocolManagerFactory

+{ + + /** + * This method exists because java templates won't store the type of P at runtime. + * So it's not possible to write a generic method with having the Class Type. + * This will serve as a tool for sub classes to filter properly* * * + * + * @param type + * @param listIn + * @return + */ + protected List

filterInterceptors(Class

type, List listIn) + { + if (listIn == null) + { + return Collections.emptyList(); + } + else + { + CopyOnWriteArrayList

listOut = new CopyOnWriteArrayList(); + for (BaseInterceptor in : listIn) + { + if (type.isInstance(in)) + { + listOut.add((P) in); + } + } + return listOut; + } + } +} diff --git a/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManager.java b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManager.java index 254ac00a84..c07d1b751b 100644 --- a/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManager.java +++ b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManager.java @@ -16,14 +16,27 @@ */ package org.apache.activemq.spi.core.protocol; +import java.util.List; + import io.netty.channel.ChannelPipeline; import org.apache.activemq.api.core.ActiveMQBuffer; +import org.apache.activemq.api.core.BaseInterceptor; import org.apache.activemq.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.spi.core.remoting.Acceptor; import org.apache.activemq.spi.core.remoting.Connection; -public interface ProtocolManager +public interface ProtocolManager

{ + ProtocolManagerFactory

getFactory(); + + /** + * This method will receive all the interceptors on the system and you should filter them out * + * + * @param incomingInterceptors + * @param outgoingInterceptors + */ + void updateInterceptors(List incomingInterceptors, List outgoingInterceptors); + ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection); void removeHandler(final String name); @@ -37,6 +50,7 @@ public interface ProtocolManager /** * Gets the Message Converter towards ActiveMQ. * Notice this being null means no need to convert + * * @return */ MessageConverter getConverter(); diff --git a/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManagerFactory.java b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManagerFactory.java index 25f3d62fba..69f2426f4e 100644 --- a/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManagerFactory.java +++ b/activemq-server/src/main/java/org/apache/activemq/spi/core/protocol/ProtocolManagerFactory.java @@ -18,12 +18,28 @@ package org.apache.activemq.spi.core.protocol; import java.util.List; -import org.apache.activemq.api.core.Interceptor; +import org.apache.activemq.api.core.BaseInterceptor; import org.apache.activemq.core.server.ActiveMQServer; -public interface ProtocolManagerFactory +public interface ProtocolManagerFactory

{ - ProtocolManager createProtocolManager(ActiveMQServer server, List incomingInterceptors, List outgoingInterceptors); + /** + * When you create the ProtocolManager, you should filter out any interceptors that won't belong + * to this Protocol. + * For example don't send any core Interceptors {@link org.apache.activemq.api.core.Interceptor} to Stomp * * * + * @param server + * @param incomingInterceptors + * @param outgoingInterceptors + * @return + */ + ProtocolManager createProtocolManager(ActiveMQServer server, List

incomingInterceptors, List

outgoingInterceptors); + + /** + * This should get the entire list and only return the ones this factory can deal with * + * @param interceptors + * @return + */ + List

filterInterceptors(List interceptors); String[] getProtocols(); } diff --git a/docs/user-manual/en/intercepting-operations.md b/docs/user-manual/en/intercepting-operations.md index 145aba2d44..ad036880d8 100644 --- a/docs/user-manual/en/intercepting-operations.md +++ b/docs/user-manual/en/intercepting-operations.md @@ -20,12 +20,12 @@ public interface Interceptor } ``` -For stomp protocol an interceptor must extend the `StompFrameInterceptor class`: +For stomp protocol an interceptor must implement the `StompFrameInterceptor class`: ``` java package org.apache.activemq.core.protocol.stomp; -public abstract class StompFrameInterceptor +public interface StompFrameInterceptor { public abstract boolean intercept(StompFrame stompFrame, RemotingConnection connection); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java index 8df0879909..17a19435ea 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/ExtraStompTest.java @@ -26,9 +26,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.activemq.api.core.Interceptor; import org.apache.activemq.api.core.TransportConfiguration; import org.apache.activemq.api.core.client.ActiveMQClient; import org.apache.activemq.core.config.Configuration; +import org.apache.activemq.core.protocol.core.Packet; import org.apache.activemq.core.protocol.stomp.Stomp; import org.apache.activemq.core.protocol.stomp.StompFrame; import org.apache.activemq.core.protocol.stomp.StompFrameInterceptor; @@ -820,11 +822,25 @@ public class ExtraStompTest extends StompTestBase return server; } - static List incomingInterceptedFrames = new ArrayList(); - static List outgoingInterceptedFrames = new ArrayList(); - public static class MyIncomingStompFrameInterceptor extends StompFrameInterceptor + public static class MyCoreInterceptor implements Interceptor { + static List incomingInterceptedFrames = new ArrayList(); + + @Override + public boolean intercept(Packet packet, RemotingConnection connection) + { + incomingInterceptedFrames.add(packet); + return true; + } + + } + + + public static class MyIncomingStompFrameInterceptor implements StompFrameInterceptor + { + static List incomingInterceptedFrames = new ArrayList(); + @Override public boolean intercept(StompFrame stompFrame, RemotingConnection connection) { @@ -834,8 +850,12 @@ public class ExtraStompTest extends StompTestBase } } - public static class MyOutgoingStompFrameInterceptor extends StompFrameInterceptor + + public static class MyOutgoingStompFrameInterceptor implements StompFrameInterceptor { + + static List outgoingInterceptedFrames = new ArrayList(); + @Override public boolean intercept(StompFrame stompFrame, RemotingConnection connection) { @@ -848,17 +868,23 @@ public class ExtraStompTest extends StompTestBase @Test public void stompFrameInterceptor() throws Exception { + MyIncomingStompFrameInterceptor.incomingInterceptedFrames.clear(); + MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.clear(); try { List incomingInterceptorList = new ArrayList(); incomingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyIncomingStompFrameInterceptor"); + incomingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyCoreInterceptor"); List outgoingInterceptorList = new ArrayList(); outgoingInterceptorList.add("org.apache.activemq.tests.integration.stomp.ExtraStompTest$MyOutgoingStompFrameInterceptor"); server = createServerWithStompInterceptor(incomingInterceptorList, outgoingInterceptorList); server.start(); - setUpAfterServer(); + setUpAfterServer(); // This will make some calls through core + + // So we clear them here + MyCoreInterceptor.incomingInterceptedFrames.clear(); String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; sendFrame(frame); @@ -868,16 +894,20 @@ public class ExtraStompTest extends StompTestBase frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL; sendFrame(frame); + assertEquals(0, MyCoreInterceptor.incomingInterceptedFrames.size()); sendMessage(getName()); + // Something was supposed to be called on sendMessages + assertTrue("core interceptor is not working", MyCoreInterceptor.incomingInterceptedFrames.size() > 0); + receiveFrame(10000); frame = "SEND\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n\n" + - "Hello World" + - Stomp.NULL; + getQueuePrefix() + + getQueueName() + + "\n\n" + + "Hello World" + + Stomp.NULL; sendFrame(frame); receiveFrame(10000); @@ -904,22 +934,32 @@ public class ExtraStompTest extends StompTestBase outgoingCommands.add("MESSAGE"); outgoingCommands.add("MESSAGE"); - Assert.assertEquals(4, incomingInterceptedFrames.size()); - Assert.assertEquals(3, outgoingInterceptedFrames.size()); + long timeout = System.currentTimeMillis() + 1000; - for (int i = 0; i < incomingInterceptedFrames.size(); i++) + // Things are async, giving some time to things arrive before we actually assert + while (MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size() < 4 && + MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size() < 3 && + timeout > System.currentTimeMillis()) { - Assert.assertEquals(incomingCommands.get(i), incomingInterceptedFrames.get(i).getCommand()); - Assert.assertEquals("incomingInterceptedVal", incomingInterceptedFrames.get(i).getHeader("incomingInterceptedProp")); + Thread.sleep(10); } - for (int i = 0; i < outgoingInterceptedFrames.size(); i++) + Assert.assertEquals(4, MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size()); + Assert.assertEquals(3, MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size()); + + for (int i = 0; i < MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size(); i++) { - Assert.assertEquals(outgoingCommands.get(i), outgoingInterceptedFrames.get(i).getCommand()); + Assert.assertEquals(incomingCommands.get(i), MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getCommand()); + Assert.assertEquals("incomingInterceptedVal", MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getHeader("incomingInterceptedProp")); } - Assert.assertEquals("incomingInterceptedVal", outgoingInterceptedFrames.get(2).getHeader("incomingInterceptedProp")); - Assert.assertEquals("outgoingInterceptedVal", outgoingInterceptedFrames.get(2).getHeader("outgoingInterceptedProp")); + for (int i = 0; i < MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size(); i++) + { + Assert.assertEquals(outgoingCommands.get(i), MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(i).getCommand()); + } + + Assert.assertEquals("incomingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("incomingInterceptedProp")); + Assert.assertEquals("outgoingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("outgoingInterceptedProp")); } protected JMSServerManager createServerWithStompInterceptor(List stompIncomingInterceptor, List stompOutgoingInterceptor) throws Exception @@ -932,22 +972,22 @@ public class ExtraStompTest extends StompTestBase TransportConfiguration stompTransport = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); Configuration config = createBasicConfig() - .setPersistenceEnabled(false) - .addAcceptorConfiguration(stompTransport) - .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)) - .setIncomingInterceptorClassNames(stompIncomingInterceptor) - .setOutgoingInterceptorClassNames(stompOutgoingInterceptor); + .setPersistenceEnabled(false) + .addAcceptorConfiguration(stompTransport) + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)) + .setIncomingInterceptorClassNames(stompIncomingInterceptor) + .setOutgoingInterceptorClassNames(stompOutgoingInterceptor); ActiveMQServer hornetQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass)); JMSConfiguration jmsConfig = new JMSConfigurationImpl(); jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl() - .setName(getQueueName()) - .setDurable(false) - .setBindings(getQueueName())); + .setName(getQueueName()) + .setDurable(false) + .setBindings(getQueueName())); jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl() - .setName(getTopicName()) - .setBindings(getTopicName())); + .setName(getTopicName()) + .setBindings(getTopicName())); server = new JMSServerManagerImpl(hornetQServer, jmsConfig); server.setRegistry(new JndiBindingRegistry(new InVMNamingContext())); return server; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java index 2387c719a5..937b85d2f7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java @@ -182,10 +182,10 @@ public abstract class StompTestBase extends UnitTestCase createBootstrap(); connection = connectionFactory.createConnection(); + connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); queue = session.createQueue(getQueueName()); topic = session.createTopic(getTopicName()); - connection.start(); } From 2f819a63e7afa734d096156277358933c3311c45 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 16 Mar 2015 19:17:52 -0400 Subject: [PATCH 3/3] Improving a test that failed due to what seemed a race --- .../tests/unit/core/server/impl/QueueImplTest.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java index 8286ab665e..3be1f611ad 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/QueueImplTest.java @@ -625,8 +625,13 @@ public class QueueImplTest extends UnitTestCase queue.resume(); // Need to make sure the consumers will receive the messages before we do these assertions - long timeout = System.currentTimeMillis() + 1000; - while (cons1.getReferences().size() != numMessages / 2 && cons2.getReferences().size() != numMessages / 2 && timeout > System.currentTimeMillis()) + long timeout = System.currentTimeMillis() + 5000; + while (cons1.getReferences().size() != numMessages / 2 && timeout > System.currentTimeMillis()) + { + Thread.sleep(1); + } + + while (cons2.getReferences().size() != numMessages / 2 && timeout > System.currentTimeMillis()) { Thread.sleep(1); }