diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTInterceptor.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTInterceptor.java new file mode 100644 index 0000000000..ba22f25dc7 --- /dev/null +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTInterceptor.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.activemq.artemis.core.protocol.mqtt; + +import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.activemq.artemis.api.core.BaseInterceptor; + +public interface MQTTInterceptor extends BaseInterceptor { +} diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 306d1467f1..17fc978984 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -53,6 +53,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { private MQTTSession session; private ActiveMQServer server; + private MQTTProtocolManager protocolManager; // This Channel Handler is not sharable, therefore it can only ever be associated with a single ctx. private ChannelHandlerContext ctx; @@ -61,8 +62,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { private boolean stopped = false; - public MQTTProtocolHandler(ActiveMQServer server) { + public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager) { this.server = server; + this.protocolManager = protocolManager; } void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception { @@ -188,6 +190,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { } void handlePublish(MqttPublishMessage message) throws Exception { + this.protocolManager.invokeIncoming(message, this.connection); session.getMqttPublishManager().handleMessage(message.variableHeader().messageId(), message.variableHeader().topicName(), message.fixedHeader().qosLevel().value(), message.payload(), message.fixedHeader().isRetain()); } @@ -281,6 +284,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), false, 0); MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId); MqttMessage publish = new MqttPublishMessage(header, varHeader, payload); + this.protocolManager.invokeOutgoing(publish, connection); ctx.write(publish); ctx.flush(); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index d2c0793177..1d38fcf2b7 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -16,35 +16,42 @@ */ package org.apache.activemq.artemis.core.protocol.mqtt; -import java.util.List; - import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; +import io.netty.handler.codec.mqtt.MqttMessage; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; +import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; -import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import java.util.ArrayList; +import java.util.List; + /** * MQTTProtocolManager */ -class MQTTProtocolManager implements ProtocolManager, NotificationListener { +class MQTTProtocolManager extends AbstractProtocolManager + implements NotificationListener { private ActiveMQServer server; private MQTTLogger log = MQTTLogger.LOGGER; + private final List incomingInterceptors = new ArrayList<>(); + private final List outgoingInterceptors = new ArrayList<>(); - MQTTProtocolManager(ActiveMQServer server) { + MQTTProtocolManager(ActiveMQServer server, List incomingInterceptors, List outgoingInterceptors) { this.server = server; + this.updateInterceptors(incomingInterceptors, outgoingInterceptors); } @Override @@ -58,8 +65,12 @@ class MQTTProtocolManager implements ProtocolManager, NotificationListener { } @Override - public void updateInterceptors(List incomingInterceptors, List outgoingInterceptors) { - // TODO handle interceptors + public void updateInterceptors(List incoming, List outgoing) { + this.incomingInterceptors.clear(); + this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming)); + + this.outgoingInterceptors.clear(); + this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing)); } @Override @@ -100,7 +111,7 @@ class MQTTProtocolManager implements ProtocolManager, NotificationListener { pipeline.addLast(new MqttEncoder()); pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE)); - pipeline.addLast(new MQTTProtocolHandler(server)); + pipeline.addLast(new MQTTProtocolHandler(server, this)); } @Override @@ -126,4 +137,12 @@ class MQTTProtocolManager implements ProtocolManager, NotificationListener { @Override public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) { } + + public void invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) { + super.invokeInterceptors(this.incomingInterceptors, mqttMessage, connection); + } + + public void invokeOutgoing(MqttMessage mqttMessage, MQTTConnection connection) { + super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, connection); + } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java index 982723f89b..58826f6b28 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java @@ -22,12 +22,13 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.osgi.service.component.annotations.Component; @Component(service = ProtocolManagerFactory.class) -public class MQTTProtocolManagerFactory implements ProtocolManagerFactory { +public class MQTTProtocolManagerFactory extends AbstractProtocolManagerFactory { public static final String MQTT_PROTOCOL_NAME = "MQTT"; @@ -40,13 +41,12 @@ public class MQTTProtocolManagerFactory implements ProtocolManagerFactory parameters, List incomingInterceptors, List outgoingInterceptors) { - return new MQTTProtocolManager(server); + return new MQTTProtocolManager(server, incomingInterceptors, outgoingInterceptors); } @Override - public List filterInterceptors(List list) { - // TODO Add support for interceptors. - return null; + public List filterInterceptors(List interceptors) { + return internalFilterInterceptors(MQTTInterceptor.class, interceptors); } @Override diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 4b6f5ba1b2..601d8339a9 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -37,9 +37,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; -import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; @@ -52,7 +52,7 @@ import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto /** * StompProtocolManager */ -class StompProtocolManager implements ProtocolManager { +class StompProtocolManager extends AbstractProtocolManager { // Constants ----------------------------------------------------- // Attributes ---------------------------------------------------- @@ -410,21 +410,4 @@ class StompProtocolManager implements ProtocolManager { public ActiveMQServer getServer() { return server; } - - private void invokeInterceptors(List interceptors, - final StompFrame frame, - final StompConnection connection) { - if (interceptors != null && !interceptors.isEmpty()) { - for (StompFrameInterceptor interceptor : interceptors) { - try { - if (!interceptor.intercept(frame, connection)) { - break; - } - } - catch (Exception e) { - ActiveMQServerLogger.LOGGER.error(e); - } - } - } - } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java new file mode 100644 index 0000000000..ddefe86644 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.activemq.artemis.spi.core.protocol; + +import org.apache.activemq.artemis.api.core.BaseInterceptor; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; + +import java.util.List; + +public abstract class AbstractProtocolManager, C extends RemotingConnection> + implements ProtocolManager { + + protected void invokeInterceptors(final List interceptors, + final P message, + final C connection) { + if (interceptors != null && !interceptors.isEmpty()) { + for (I interceptor : interceptors) { + try { + if (!interceptor.intercept(message, connection)) { + break; + } + } + catch (Exception e) { + ActiveMQServerLogger.LOGGER.error(e); + } + } + } + } +} diff --git a/docs/hacking-guide/en/maintainers.md b/docs/hacking-guide/en/maintainers.md index e0e6dad3dd..4319f5aec4 100644 --- a/docs/hacking-guide/en/maintainers.md +++ b/docs/hacking-guide/en/maintainers.md @@ -11,7 +11,7 @@ What does it mean to be reasonably confident? If the developer has run the same builds are running they can be reasonably confident. Currently the [PR build](https://builds.apache.org/job/ActiveMQ-Artemis-PR-Build/) runs this command: - mvn compile test-compile javadoc:javadoc -Pfast-tests -Pextra-tests test + mvn -Pfast-tests -Pextra-tests install However, if the changes are significant, touches a wide area of code, or even if the developer just wants a second opinion they are encouraged to engage other members of the community to obtain an additional review prior to pushing. diff --git a/docs/user-manual/en/intercepting-operations.md b/docs/user-manual/en/intercepting-operations.md index 6c6a4af6f7..2b1d0d471d 100644 --- a/docs/user-manual/en/intercepting-operations.md +++ b/docs/user-manual/en/intercepting-operations.md @@ -9,7 +9,9 @@ makes interceptors powerful, but also potentially dangerous. ## Implementing The Interceptors -An interceptor must implement the `Interceptor interface`: +All interceptors are protocol specific. + +An interceptor for the core protocol must implement the interface `Interceptor`: ``` java package org.apache.artemis.activemq.api.core.interceptor; @@ -20,14 +22,25 @@ public interface Interceptor } ``` -For stomp protocol an interceptor must implement the `StompFrameInterceptor class`: +For stomp protocol an interceptor must implement the interface `StompFrameInterceptor`: ``` java package org.apache.activemq.artemis.core.protocol.stomp; -public interface StompFrameInterceptor +public interface StompFrameInterceptor extends BaseInterceptor { - public abstract boolean intercept(StompFrame stompFrame, RemotingConnection connection); + boolean intercept(StompFrame stompFrame, RemotingConnection connection); +} +``` + +Likewise for MQTT protocol, an interceptor must implement the interface `MQTTInterceptor`: + +``` java +package org.apache.activemq.artemis.core.protocol.mqtt; + +public interface MQTTInterceptor extends BaseInterceptor +{ + boolean intercept(MqttMessage mqttMessage, RemotingConnection connection); } ``` diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index b305c80a70..1b0964baf0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -246,7 +246,9 @@ public class MQTTTest extends MQTTTestSupport { } @Test(timeout = 60 * 1000) - public void testSendAndReceiveExactlyOnce() throws Exception { + public void testSendAndReceiveExactlyOnceWithInterceptors() throws Exception { + MQTTIncomingInterceptor.clear(); + MQTTOutoingInterceptor.clear(); final MQTTClientProvider publisher = getMQTTClientProvider(); initializeConnection(publisher); @@ -263,6 +265,8 @@ public class MQTTTest extends MQTTTestSupport { } subscriber.disconnect(); publisher.disconnect(); + assertEquals(NUM_MESSAGES, MQTTIncomingInterceptor.getMessageCount()); + assertEquals(NUM_MESSAGES, MQTTOutoingInterceptor.getMessageCount()); } @Test(timeout = 60 * 1000) @@ -380,7 +384,8 @@ public class MQTTTest extends MQTTTestSupport { Message msg = connection.receive(5, TimeUnit.SECONDS); do { assertNotNull("RETAINED null " + wildcard, msg); - assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED)); + String msgPayload = new String(msg.getPayload()); + assertTrue("RETAINED prefix " + wildcard + " msg " + msgPayload, msgPayload.startsWith(RETAINED)); assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches()); msg.ack(); msg = connection.receive(5000, TimeUnit.MILLISECONDS); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java index 73489af0f1..010949c190 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java @@ -29,13 +29,19 @@ import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.HashMap; import java.util.LinkedList; +import java.util.Map; import java.util.concurrent.TimeUnit; +import io.netty.handler.codec.mqtt.MqttMessage; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Tracer; @@ -47,6 +53,8 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.util.Collections.singletonList; + public class MQTTTestSupport extends ActiveMQTestBase { private ActiveMQServer server; @@ -79,11 +87,6 @@ public class MQTTTestSupport extends ActiveMQTestBase { return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile(); } - public MQTTTestSupport(String connectorScheme, boolean useSSL) { - this.protocolScheme = connectorScheme; - this.useSSL = useSSL; - } - @Override public String getName() { return name.getMethodName(); @@ -120,7 +123,7 @@ public class MQTTTestSupport extends ActiveMQTestBase { public void startBroker() throws Exception { // TODO Add SSL super.setUp(); - server = createServer(true, true); + server = createServerForMQTT(); addCoreConnector(); addMQTTConnector(); AddressSettings addressSettings = new AddressSettings(); @@ -132,12 +135,19 @@ public class MQTTTestSupport extends ActiveMQTestBase { server.waitForActivation(10, TimeUnit.SECONDS); } + private ActiveMQServer createServerForMQTT() throws Exception { + Configuration defaultConfig = createDefaultConfig(true) + .setIncomingInterceptorClassNames(singletonList(MQTTIncomingInterceptor.class.getName())) + .setOutgoingInterceptorClassNames(singletonList(MQTTOutoingInterceptor.class.getName())); + return createServer(true, defaultConfig); + } + protected void addCoreConnector() throws Exception { // Overrides of this method can add additional configuration options or add multiple // MQTT transport connectors as needed, the port variable is always supposed to be // assigned the primary MQTT connector's port. - HashMap params = new HashMap<>(); + Map params = new HashMap<>(); params.put(TransportConstants.PORT_PROP_NAME, "" + 5445); params.put(TransportConstants.PROTOCOLS_PROP_NAME, "CORE"); TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); @@ -151,7 +161,7 @@ public class MQTTTestSupport extends ActiveMQTestBase { // MQTT transport connectors as needed, the port variable is always supposed to be // assigned the primary MQTT connector's port. - HashMap params = new HashMap<>(); + Map params = new HashMap<>(); params.put(TransportConstants.PORT_PROP_NAME, "" + port); params.put(TransportConstants.PROTOCOLS_PROP_NAME, "MQTT"); TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); @@ -336,4 +346,42 @@ public class MQTTTestSupport extends ActiveMQTestBase { return new X509Certificate[0]; } } + + public static class MQTTIncomingInterceptor implements MQTTInterceptor { + + private static int messageCount = 0; + + @Override + public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException { + messageCount++; + return true; + } + + public static void clear() { + messageCount = 0; + } + + public static int getMessageCount() { + return messageCount; + } + } + + public static class MQTTOutoingInterceptor implements MQTTInterceptor { + + private static int messageCount = 0; + + @Override + public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException { + messageCount++; + return true; + } + + public static void clear() { + messageCount = 0; + } + + public static int getMessageCount() { + return messageCount; + } + } }