ARTEMIS-607 Added interceptor support for MQTT protocol.
Also updated the maintainer's guide to clarify what is run in the PR builder.
This commit is contained in:
parent
05ac53a305
commit
7c746c719e
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.protocol.mqtt;
|
||||
|
||||
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||
|
||||
public interface MQTTInterceptor extends BaseInterceptor<MqttMessage> {
|
||||
}
|
|
@ -53,6 +53,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
|||
private MQTTSession session;
|
||||
|
||||
private ActiveMQServer server;
|
||||
private MQTTProtocolManager protocolManager;
|
||||
|
||||
// This Channel Handler is not sharable, therefore it can only ever be associated with a single ctx.
|
||||
private ChannelHandlerContext ctx;
|
||||
|
@ -61,8 +62,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
|||
|
||||
private boolean stopped = false;
|
||||
|
||||
public MQTTProtocolHandler(ActiveMQServer server) {
|
||||
public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager) {
|
||||
this.server = server;
|
||||
this.protocolManager = protocolManager;
|
||||
}
|
||||
|
||||
void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception {
|
||||
|
@ -188,6 +190,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
|||
}
|
||||
|
||||
void handlePublish(MqttPublishMessage message) throws Exception {
|
||||
this.protocolManager.invokeIncoming(message, this.connection);
|
||||
session.getMqttPublishManager().handleMessage(message.variableHeader().messageId(), message.variableHeader().topicName(), message.fixedHeader().qosLevel().value(), message.payload(), message.fixedHeader().isRetain());
|
||||
}
|
||||
|
||||
|
@ -281,6 +284,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
|
|||
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), false, 0);
|
||||
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
|
||||
MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
|
||||
this.protocolManager.invokeOutgoing(publish, connection);
|
||||
|
||||
ctx.write(publish);
|
||||
ctx.flush();
|
||||
|
|
|
@ -16,35 +16,42 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.mqtt;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.handler.codec.mqtt.MqttDecoder;
|
||||
import io.netty.handler.codec.mqtt.MqttEncoder;
|
||||
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||
import org.apache.activemq.artemis.core.server.management.NotificationListener;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* MQTTProtocolManager
|
||||
*/
|
||||
class MQTTProtocolManager implements ProtocolManager, NotificationListener {
|
||||
class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,MQTTInterceptor,MQTTConnection>
|
||||
implements NotificationListener {
|
||||
|
||||
private ActiveMQServer server;
|
||||
|
||||
private MQTTLogger log = MQTTLogger.LOGGER;
|
||||
private final List<MQTTInterceptor> incomingInterceptors = new ArrayList<>();
|
||||
private final List<MQTTInterceptor> outgoingInterceptors = new ArrayList<>();
|
||||
|
||||
MQTTProtocolManager(ActiveMQServer server) {
|
||||
MQTTProtocolManager(ActiveMQServer server, List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors) {
|
||||
this.server = server;
|
||||
this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -58,8 +65,12 @@ class MQTTProtocolManager implements ProtocolManager, NotificationListener {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void updateInterceptors(List incomingInterceptors, List outgoingInterceptors) {
|
||||
// TODO handle interceptors
|
||||
public void updateInterceptors(List incoming, List outgoing) {
|
||||
this.incomingInterceptors.clear();
|
||||
this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming));
|
||||
|
||||
this.outgoingInterceptors.clear();
|
||||
this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -100,7 +111,7 @@ class MQTTProtocolManager implements ProtocolManager, NotificationListener {
|
|||
pipeline.addLast(new MqttEncoder());
|
||||
pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE));
|
||||
|
||||
pipeline.addLast(new MQTTProtocolHandler(server));
|
||||
pipeline.addLast(new MQTTProtocolHandler(server, this));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -126,4 +137,12 @@ class MQTTProtocolManager implements ProtocolManager, NotificationListener {
|
|||
@Override
|
||||
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
|
||||
}
|
||||
|
||||
public void invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) {
|
||||
super.invokeInterceptors(this.incomingInterceptors, mqttMessage, connection);
|
||||
}
|
||||
|
||||
public void invokeOutgoing(MqttMessage mqttMessage, MQTTConnection connection) {
|
||||
super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, connection);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,12 +22,13 @@ import java.util.Map;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
|
||||
import org.osgi.service.component.annotations.Component;
|
||||
|
||||
@Component(service = ProtocolManagerFactory.class)
|
||||
public class MQTTProtocolManagerFactory implements ProtocolManagerFactory<BaseInterceptor> {
|
||||
public class MQTTProtocolManagerFactory extends AbstractProtocolManagerFactory<MQTTInterceptor> {
|
||||
|
||||
public static final String MQTT_PROTOCOL_NAME = "MQTT";
|
||||
|
||||
|
@ -40,13 +41,12 @@ public class MQTTProtocolManagerFactory implements ProtocolManagerFactory<BaseIn
|
|||
final Map<String, Object> parameters,
|
||||
List<BaseInterceptor> incomingInterceptors,
|
||||
List<BaseInterceptor> outgoingInterceptors) {
|
||||
return new MQTTProtocolManager(server);
|
||||
return new MQTTProtocolManager(server, incomingInterceptors, outgoingInterceptors);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List filterInterceptors(List list) {
|
||||
// TODO Add support for interceptors.
|
||||
return null;
|
||||
public List<MQTTInterceptor> filterInterceptors(List<BaseInterceptor> interceptors) {
|
||||
return internalFilterInterceptors(MQTTInterceptor.class, interceptors);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -37,9 +37,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
||||
|
@ -52,7 +52,7 @@ import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto
|
|||
/**
|
||||
* StompProtocolManager
|
||||
*/
|
||||
class StompProtocolManager implements ProtocolManager<StompFrameInterceptor> {
|
||||
class StompProtocolManager extends AbstractProtocolManager<StompFrame,StompFrameInterceptor,StompConnection> {
|
||||
// Constants -----------------------------------------------------
|
||||
|
||||
// Attributes ----------------------------------------------------
|
||||
|
@ -410,21 +410,4 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor> {
|
|||
public ActiveMQServer getServer() {
|
||||
return server;
|
||||
}
|
||||
|
||||
private void invokeInterceptors(List<StompFrameInterceptor> interceptors,
|
||||
final StompFrame frame,
|
||||
final StompConnection connection) {
|
||||
if (interceptors != null && !interceptors.isEmpty()) {
|
||||
for (StompFrameInterceptor interceptor : interceptors) {
|
||||
try {
|
||||
if (!interceptor.intercept(frame, connection)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.error(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.spi.core.protocol;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public abstract class AbstractProtocolManager<P, I extends BaseInterceptor<P>, C extends RemotingConnection>
|
||||
implements ProtocolManager<I> {
|
||||
|
||||
protected void invokeInterceptors(final List<I> interceptors,
|
||||
final P message,
|
||||
final C connection) {
|
||||
if (interceptors != null && !interceptors.isEmpty()) {
|
||||
for (I interceptor : interceptors) {
|
||||
try {
|
||||
if (!interceptor.intercept(message, connection)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.error(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -11,7 +11,7 @@ What does it mean to be reasonably confident? If the developer has run the same
|
|||
builds are running they can be reasonably confident. Currently the [PR build](https://builds.apache.org/job/ActiveMQ-Artemis-PR-Build/)
|
||||
runs this command:
|
||||
|
||||
mvn compile test-compile javadoc:javadoc -Pfast-tests -Pextra-tests test
|
||||
mvn -Pfast-tests -Pextra-tests install
|
||||
|
||||
However, if the changes are significant, touches a wide area of code, or even if the developer just wants a second
|
||||
opinion they are encouraged to engage other members of the community to obtain an additional review prior to pushing.
|
||||
|
|
|
@ -9,7 +9,9 @@ makes interceptors powerful, but also potentially dangerous.
|
|||
|
||||
## Implementing The Interceptors
|
||||
|
||||
An interceptor must implement the `Interceptor interface`:
|
||||
All interceptors are protocol specific.
|
||||
|
||||
An interceptor for the core protocol must implement the interface `Interceptor`:
|
||||
|
||||
``` java
|
||||
package org.apache.artemis.activemq.api.core.interceptor;
|
||||
|
@ -20,14 +22,25 @@ public interface Interceptor
|
|||
}
|
||||
```
|
||||
|
||||
For stomp protocol an interceptor must implement the `StompFrameInterceptor class`:
|
||||
For stomp protocol an interceptor must implement the interface `StompFrameInterceptor`:
|
||||
|
||||
``` java
|
||||
package org.apache.activemq.artemis.core.protocol.stomp;
|
||||
|
||||
public interface StompFrameInterceptor
|
||||
public interface StompFrameInterceptor extends BaseInterceptor<StompFrame>
|
||||
{
|
||||
public abstract boolean intercept(StompFrame stompFrame, RemotingConnection connection);
|
||||
boolean intercept(StompFrame stompFrame, RemotingConnection connection);
|
||||
}
|
||||
```
|
||||
|
||||
Likewise for MQTT protocol, an interceptor must implement the interface `MQTTInterceptor`:
|
||||
|
||||
``` java
|
||||
package org.apache.activemq.artemis.core.protocol.mqtt;
|
||||
|
||||
public interface MQTTInterceptor extends BaseInterceptor<MqttMessage>
|
||||
{
|
||||
boolean intercept(MqttMessage mqttMessage, RemotingConnection connection);
|
||||
}
|
||||
```
|
||||
|
||||
|
|
|
@ -246,7 +246,9 @@ public class MQTTTest extends MQTTTestSupport {
|
|||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testSendAndReceiveExactlyOnce() throws Exception {
|
||||
public void testSendAndReceiveExactlyOnceWithInterceptors() throws Exception {
|
||||
MQTTIncomingInterceptor.clear();
|
||||
MQTTOutoingInterceptor.clear();
|
||||
final MQTTClientProvider publisher = getMQTTClientProvider();
|
||||
initializeConnection(publisher);
|
||||
|
||||
|
@ -263,6 +265,8 @@ public class MQTTTest extends MQTTTestSupport {
|
|||
}
|
||||
subscriber.disconnect();
|
||||
publisher.disconnect();
|
||||
assertEquals(NUM_MESSAGES, MQTTIncomingInterceptor.getMessageCount());
|
||||
assertEquals(NUM_MESSAGES, MQTTOutoingInterceptor.getMessageCount());
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
|
@ -380,7 +384,8 @@ public class MQTTTest extends MQTTTestSupport {
|
|||
Message msg = connection.receive(5, TimeUnit.SECONDS);
|
||||
do {
|
||||
assertNotNull("RETAINED null " + wildcard, msg);
|
||||
assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED));
|
||||
String msgPayload = new String(msg.getPayload());
|
||||
assertTrue("RETAINED prefix " + wildcard + " msg " + msgPayload, msgPayload.startsWith(RETAINED));
|
||||
assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches());
|
||||
msg.ack();
|
||||
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
|
||||
|
|
|
@ -29,13 +29,19 @@ import java.security.cert.CertificateException;
|
|||
import java.security.cert.X509Certificate;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.fusesource.mqtt.client.MQTT;
|
||||
import org.fusesource.mqtt.client.Tracer;
|
||||
|
@ -47,6 +53,8 @@ import org.junit.rules.TestName;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
|
||||
public class MQTTTestSupport extends ActiveMQTestBase {
|
||||
|
||||
private ActiveMQServer server;
|
||||
|
@ -79,11 +87,6 @@ public class MQTTTestSupport extends ActiveMQTestBase {
|
|||
return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
|
||||
}
|
||||
|
||||
public MQTTTestSupport(String connectorScheme, boolean useSSL) {
|
||||
this.protocolScheme = connectorScheme;
|
||||
this.useSSL = useSSL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return name.getMethodName();
|
||||
|
@ -120,7 +123,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
|
|||
public void startBroker() throws Exception {
|
||||
// TODO Add SSL
|
||||
super.setUp();
|
||||
server = createServer(true, true);
|
||||
server = createServerForMQTT();
|
||||
addCoreConnector();
|
||||
addMQTTConnector();
|
||||
AddressSettings addressSettings = new AddressSettings();
|
||||
|
@ -132,12 +135,19 @@ public class MQTTTestSupport extends ActiveMQTestBase {
|
|||
server.waitForActivation(10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private ActiveMQServer createServerForMQTT() throws Exception {
|
||||
Configuration defaultConfig = createDefaultConfig(true)
|
||||
.setIncomingInterceptorClassNames(singletonList(MQTTIncomingInterceptor.class.getName()))
|
||||
.setOutgoingInterceptorClassNames(singletonList(MQTTOutoingInterceptor.class.getName()));
|
||||
return createServer(true, defaultConfig);
|
||||
}
|
||||
|
||||
protected void addCoreConnector() throws Exception {
|
||||
// Overrides of this method can add additional configuration options or add multiple
|
||||
// MQTT transport connectors as needed, the port variable is always supposed to be
|
||||
// assigned the primary MQTT connector's port.
|
||||
|
||||
HashMap<String, Object> params = new HashMap<>();
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put(TransportConstants.PORT_PROP_NAME, "" + 5445);
|
||||
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "CORE");
|
||||
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
|
||||
|
@ -151,7 +161,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
|
|||
// MQTT transport connectors as needed, the port variable is always supposed to be
|
||||
// assigned the primary MQTT connector's port.
|
||||
|
||||
HashMap<String, Object> params = new HashMap<>();
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put(TransportConstants.PORT_PROP_NAME, "" + port);
|
||||
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "MQTT");
|
||||
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
|
||||
|
@ -336,4 +346,42 @@ public class MQTTTestSupport extends ActiveMQTestBase {
|
|||
return new X509Certificate[0];
|
||||
}
|
||||
}
|
||||
|
||||
public static class MQTTIncomingInterceptor implements MQTTInterceptor {
|
||||
|
||||
private static int messageCount = 0;
|
||||
|
||||
@Override
|
||||
public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
|
||||
messageCount++;
|
||||
return true;
|
||||
}
|
||||
|
||||
public static void clear() {
|
||||
messageCount = 0;
|
||||
}
|
||||
|
||||
public static int getMessageCount() {
|
||||
return messageCount;
|
||||
}
|
||||
}
|
||||
|
||||
public static class MQTTOutoingInterceptor implements MQTTInterceptor {
|
||||
|
||||
private static int messageCount = 0;
|
||||
|
||||
@Override
|
||||
public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
|
||||
messageCount++;
|
||||
return true;
|
||||
}
|
||||
|
||||
public static void clear() {
|
||||
messageCount = 0;
|
||||
}
|
||||
|
||||
public static int getMessageCount() {
|
||||
return messageCount;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue