This closes #617

This commit is contained in:
Clebert Suconic 2016-07-05 19:43:00 -04:00
commit 98fa433bd1
10 changed files with 192 additions and 48 deletions

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
public interface MQTTInterceptor extends BaseInterceptor<MqttMessage> {
}

View File

@ -53,6 +53,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
private MQTTSession session;
private ActiveMQServer server;
private MQTTProtocolManager protocolManager;
// This Channel Handler is not sharable, therefore it can only ever be associated with a single ctx.
private ChannelHandlerContext ctx;
@ -61,8 +62,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
private boolean stopped = false;
public MQTTProtocolHandler(ActiveMQServer server) {
public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager) {
this.server = server;
this.protocolManager = protocolManager;
}
void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception {
@ -188,6 +190,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
}
void handlePublish(MqttPublishMessage message) throws Exception {
this.protocolManager.invokeIncoming(message, this.connection);
session.getMqttPublishManager().handleMessage(message.variableHeader().messageId(), message.variableHeader().topicName(), message.fixedHeader().qosLevel().value(), message.payload(), message.fixedHeader().isRetain());
}
@ -281,6 +284,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), false, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
this.protocolManager.invokeOutgoing(publish, connection);
ctx.write(publish);
ctx.flush();

View File

@ -16,35 +16,42 @@
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.List;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import java.util.ArrayList;
import java.util.List;
/**
* MQTTProtocolManager
*/
class MQTTProtocolManager implements ProtocolManager, NotificationListener {
class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,MQTTInterceptor,MQTTConnection>
implements NotificationListener {
private ActiveMQServer server;
private MQTTLogger log = MQTTLogger.LOGGER;
private final List<MQTTInterceptor> incomingInterceptors = new ArrayList<>();
private final List<MQTTInterceptor> outgoingInterceptors = new ArrayList<>();
MQTTProtocolManager(ActiveMQServer server) {
MQTTProtocolManager(ActiveMQServer server, List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors) {
this.server = server;
this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
}
@Override
@ -58,8 +65,12 @@ class MQTTProtocolManager implements ProtocolManager, NotificationListener {
}
@Override
public void updateInterceptors(List incomingInterceptors, List outgoingInterceptors) {
// TODO handle interceptors
public void updateInterceptors(List incoming, List outgoing) {
this.incomingInterceptors.clear();
this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming));
this.outgoingInterceptors.clear();
this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
}
@Override
@ -100,7 +111,7 @@ class MQTTProtocolManager implements ProtocolManager, NotificationListener {
pipeline.addLast(new MqttEncoder());
pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE));
pipeline.addLast(new MQTTProtocolHandler(server));
pipeline.addLast(new MQTTProtocolHandler(server, this));
}
@Override
@ -126,4 +137,12 @@ class MQTTProtocolManager implements ProtocolManager, NotificationListener {
@Override
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
}
public void invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) {
super.invokeInterceptors(this.incomingInterceptors, mqttMessage, connection);
}
public void invokeOutgoing(MqttMessage mqttMessage, MQTTConnection connection) {
super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, connection);
}
}

View File

@ -22,12 +22,13 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.osgi.service.component.annotations.Component;
@Component(service = ProtocolManagerFactory.class)
public class MQTTProtocolManagerFactory implements ProtocolManagerFactory<BaseInterceptor> {
public class MQTTProtocolManagerFactory extends AbstractProtocolManagerFactory<MQTTInterceptor> {
public static final String MQTT_PROTOCOL_NAME = "MQTT";
@ -40,13 +41,12 @@ public class MQTTProtocolManagerFactory implements ProtocolManagerFactory<BaseIn
final Map<String, Object> parameters,
List<BaseInterceptor> incomingInterceptors,
List<BaseInterceptor> outgoingInterceptors) {
return new MQTTProtocolManager(server);
return new MQTTProtocolManager(server, incomingInterceptors, outgoingInterceptors);
}
@Override
public List filterInterceptors(List list) {
// TODO Add support for interceptors.
return null;
public List<MQTTInterceptor> filterInterceptors(List<BaseInterceptor> interceptors) {
return internalFilterInterceptors(MQTTInterceptor.class, interceptors);
}
@Override

View File

@ -37,9 +37,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@ -52,7 +52,7 @@ import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto
/**
* StompProtocolManager
*/
class StompProtocolManager implements ProtocolManager<StompFrameInterceptor> {
class StompProtocolManager extends AbstractProtocolManager<StompFrame,StompFrameInterceptor,StompConnection> {
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@ -410,21 +410,4 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor> {
public ActiveMQServer getServer() {
return server;
}
private void invokeInterceptors(List<StompFrameInterceptor> interceptors,
final StompFrame frame,
final StompConnection connection) {
if (interceptors != null && !interceptors.isEmpty()) {
for (StompFrameInterceptor interceptor : interceptors) {
try {
if (!interceptor.intercept(frame, connection)) {
break;
}
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.error(e);
}
}
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.activemq.artemis.spi.core.protocol;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import java.util.List;
public abstract class AbstractProtocolManager<P, I extends BaseInterceptor<P>, C extends RemotingConnection>
implements ProtocolManager<I> {
protected void invokeInterceptors(final List<I> interceptors,
final P message,
final C connection) {
if (interceptors != null && !interceptors.isEmpty()) {
for (I interceptor : interceptors) {
try {
if (!interceptor.intercept(message, connection)) {
break;
}
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.error(e);
}
}
}
}
}

View File

@ -11,7 +11,7 @@ What does it mean to be reasonably confident? If the developer has run the same
builds are running they can be reasonably confident. Currently the [PR build](https://builds.apache.org/job/ActiveMQ-Artemis-PR-Build/)
runs this command:
mvn compile test-compile javadoc:javadoc -Pfast-tests -Pextra-tests test
mvn -Pfast-tests -Pextra-tests install
However, if the changes are significant, touches a wide area of code, or even if the developer just wants a second
opinion they are encouraged to engage other members of the community to obtain an additional review prior to pushing.

View File

@ -9,7 +9,9 @@ makes interceptors powerful, but also potentially dangerous.
## Implementing The Interceptors
An interceptor must implement the `Interceptor interface`:
All interceptors are protocol specific.
An interceptor for the core protocol must implement the interface `Interceptor`:
``` java
package org.apache.artemis.activemq.api.core.interceptor;
@ -20,14 +22,25 @@ public interface Interceptor
}
```
For stomp protocol an interceptor must implement the `StompFrameInterceptor class`:
For stomp protocol an interceptor must implement the interface `StompFrameInterceptor`:
``` java
package org.apache.activemq.artemis.core.protocol.stomp;
public interface StompFrameInterceptor
public interface StompFrameInterceptor extends BaseInterceptor<StompFrame>
{
public abstract boolean intercept(StompFrame stompFrame, RemotingConnection connection);
boolean intercept(StompFrame stompFrame, RemotingConnection connection);
}
```
Likewise for MQTT protocol, an interceptor must implement the interface `MQTTInterceptor`:
``` java
package org.apache.activemq.artemis.core.protocol.mqtt;
public interface MQTTInterceptor extends BaseInterceptor<MqttMessage>
{
boolean intercept(MqttMessage mqttMessage, RemotingConnection connection);
}
```

View File

@ -246,7 +246,9 @@ public class MQTTTest extends MQTTTestSupport {
}
@Test(timeout = 60 * 1000)
public void testSendAndReceiveExactlyOnce() throws Exception {
public void testSendAndReceiveExactlyOnceWithInterceptors() throws Exception {
MQTTIncomingInterceptor.clear();
MQTTOutoingInterceptor.clear();
final MQTTClientProvider publisher = getMQTTClientProvider();
initializeConnection(publisher);
@ -263,6 +265,8 @@ public class MQTTTest extends MQTTTestSupport {
}
subscriber.disconnect();
publisher.disconnect();
assertEquals(NUM_MESSAGES, MQTTIncomingInterceptor.getMessageCount());
assertEquals(NUM_MESSAGES, MQTTOutoingInterceptor.getMessageCount());
}
@Test(timeout = 60 * 1000)
@ -380,7 +384,8 @@ public class MQTTTest extends MQTTTestSupport {
Message msg = connection.receive(5, TimeUnit.SECONDS);
do {
assertNotNull("RETAINED null " + wildcard, msg);
assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED));
String msgPayload = new String(msg.getPayload());
assertTrue("RETAINED prefix " + wildcard + " msg " + msgPayload, msgPayload.startsWith(RETAINED));
assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches());
msg.ack();
msg = connection.receive(5000, TimeUnit.MILLISECONDS);

View File

@ -29,13 +29,19 @@ import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Tracer;
@ -47,6 +53,8 @@ import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonList;
public class MQTTTestSupport extends ActiveMQTestBase {
private ActiveMQServer server;
@ -79,11 +87,6 @@ public class MQTTTestSupport extends ActiveMQTestBase {
return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
}
public MQTTTestSupport(String connectorScheme, boolean useSSL) {
this.protocolScheme = connectorScheme;
this.useSSL = useSSL;
}
@Override
public String getName() {
return name.getMethodName();
@ -120,7 +123,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
public void startBroker() throws Exception {
// TODO Add SSL
super.setUp();
server = createServer(true, true);
server = createServerForMQTT();
addCoreConnector();
addMQTTConnector();
AddressSettings addressSettings = new AddressSettings();
@ -132,12 +135,19 @@ public class MQTTTestSupport extends ActiveMQTestBase {
server.waitForActivation(10, TimeUnit.SECONDS);
}
private ActiveMQServer createServerForMQTT() throws Exception {
Configuration defaultConfig = createDefaultConfig(true)
.setIncomingInterceptorClassNames(singletonList(MQTTIncomingInterceptor.class.getName()))
.setOutgoingInterceptorClassNames(singletonList(MQTTOutoingInterceptor.class.getName()));
return createServer(true, defaultConfig);
}
protected void addCoreConnector() throws Exception {
// Overrides of this method can add additional configuration options or add multiple
// MQTT transport connectors as needed, the port variable is always supposed to be
// assigned the primary MQTT connector's port.
HashMap<String, Object> params = new HashMap<>();
Map<String, Object> params = new HashMap<>();
params.put(TransportConstants.PORT_PROP_NAME, "" + 5445);
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "CORE");
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
@ -151,7 +161,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
// MQTT transport connectors as needed, the port variable is always supposed to be
// assigned the primary MQTT connector's port.
HashMap<String, Object> params = new HashMap<>();
Map<String, Object> params = new HashMap<>();
params.put(TransportConstants.PORT_PROP_NAME, "" + port);
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "MQTT");
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
@ -336,4 +346,42 @@ public class MQTTTestSupport extends ActiveMQTestBase {
return new X509Certificate[0];
}
}
public static class MQTTIncomingInterceptor implements MQTTInterceptor {
private static int messageCount = 0;
@Override
public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
messageCount++;
return true;
}
public static void clear() {
messageCount = 0;
}
public static int getMessageCount() {
return messageCount;
}
}
public static class MQTTOutoingInterceptor implements MQTTInterceptor {
private static int messageCount = 0;
@Override
public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
messageCount++;
return true;
}
public static void clear() {
messageCount = 0;
}
public static int getMessageCount() {
return messageCount;
}
}
}