ARTEMIS-2985 - don't block netty threads for mqtt protocol actions

This commit is contained in:
gtully 2020-11-12 17:17:20 +00:00
parent fda4a87859
commit 9675ecae42
6 changed files with 865 additions and 28 deletions

View File

@ -104,8 +104,9 @@ public class MQTTConnectionManager {
}
session.getConnection().setConnected(true);
session.start();
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED);
// ensure we don't publish before the CONNACK
session.start();
}
}

View File

@ -41,6 +41,7 @@ import io.netty.util.ReferenceCountUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.utils.actors.Actor;
/**
* This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the
@ -65,9 +66,12 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
private boolean stopped = false;
private final Actor<MqttMessage> mqttMessageActor;
public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager) {
this.server = server;
this.protocolManager = protocolManager;
this.mqttMessageActor = new Actor<>(server.getExecutorFactory().getExecutor(), this::act);
}
void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception {
@ -82,23 +86,35 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
connection.dataReceived();
MqttMessage message = (MqttMessage) msg;
// Disconnect if Netty codec failed to decode the stream.
if (message.decoderResult().isFailure()) {
log.debug("Bad Message Disconnecting Client.");
disconnect(true);
return;
}
if (this.ctx == null) {
this.ctx = ctx;
}
// let netty handle keepalive response
if (MqttMessageType.PINGREQ == message.fixedHeader().messageType()) {
handlePingreq();
} else {
mqttMessageActor.act(message);
}
}
public void act(MqttMessage message) {
try {
if (stopped) {
disconnect(true);
return;
}
MqttMessage message = (MqttMessage) msg;
// Disconnect if Netty codec failed to decode the stream.
if (message.decoderResult().isFailure()) {
log.debug("Bad Message Disconnecting Client.");
disconnect(true);
return;
}
connection.dataReceived();
if (AuditLogger.isAnyLoggingEnabled()) {
AuditLogger.setRemoteAddress(connection.getRemoteAddress());
}
@ -113,7 +129,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
switch (message.fixedHeader().messageType()) {
case CONNECT:
handleConnect((MqttConnectMessage) message, ctx);
handleConnect((MqttConnectMessage) message);
break;
case PUBLISH:
handlePublish((MqttPublishMessage) message);
@ -136,9 +152,6 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
case UNSUBSCRIBE:
handleUnsubscribe((MqttUnsubscribeMessage) message);
break;
case PINGREQ:
handlePingreq();
break;
case DISCONNECT:
disconnect(false);
break;
@ -150,10 +163,10 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
disconnect(true);
}
} catch (Exception e) {
log.debug("Error processing Control Packet, Disconnecting Client", e);
log.warn("Error processing Control Packet, Disconnecting Client", e);
disconnect(true);
} finally {
ReferenceCountUtil.release(msg);
ReferenceCountUtil.release(message);
}
}
@ -162,8 +175,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
*
* @param connect
*/
void handleConnect(MqttConnectMessage connect, ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
void handleConnect(MqttConnectMessage connect) throws Exception {
connectionEntry.ttl = connect.variableHeader().keepAliveTimeSeconds() * 1500L;
String clientId = connect.payload().clientIdentifier();

View File

@ -244,11 +244,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
this.creationTime = System.currentTimeMillis();
if (browseOnly) {
browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator());
} else {
messageQueue.addConsumer(this);
}
this.supportLargeMessage = supportLargeMessage;
if (credits != null) {
@ -261,6 +256,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
this.server = server;
if (browseOnly) {
browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator());
} else {
messageQueue.addConsumer(this);
}
if (session.getRemotingConnection() instanceof CoreRemotingConnection) {
CoreRemotingConnection coreRemotingConnection = (CoreRemotingConnection) session.getRemotingConnection();
if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && coreRemotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {

View File

@ -19,18 +19,23 @@ package org.apache.activemq.artemis.tests.integration.mqtt;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
import org.apache.activemq.artemis.utils.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class MQTTConnnectionCleanupTest extends MQTTTestSupport {
@ -82,4 +87,36 @@ public class MQTTConnnectionCleanupTest extends MQTTTestSupport {
connection.disconnect();
}
@Test(timeout = 30 * 1000)
public void testSlowSubscribeWontBlockKeepAlive() throws Exception {
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("");
mqtt.setKeepAlive((short) 1);
mqtt.setCleanSession(true);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
NettyAcceptor acceptor = (NettyAcceptor) server.getRemotingService().getAcceptor("MQTT");
assertEquals(1, acceptor.getConnections().size());
server.getConfiguration().getBrokerBindingPlugins().add(new ActiveMQServerBindingPlugin() {
@Override
public void beforeAddBinding(Binding binding) throws ActiveMQException {
// take a little nap
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception ok) {
}
}
});
// this should take a while...but should succeed.
connection.subscribe(new Topic[]{new Topic("T.x", QoS.AT_LEAST_ONCE)});
assertEquals(1, acceptor.getConnections().size());
connection.disconnect();
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.mqtt.imported;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
import java.io.EOFException;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@ -118,7 +119,7 @@ public class MQTTSecurityCRLTest extends ActiveMQTestBase {
* keytool -import -trustcacerts -alias trust_key -file ca.crt -keystore truststore.jks
*/
@Test(expected = SSLException.class)
@Test
public void crlRevokedTest() throws Exception {
ActiveMQServer server1 = initServer();
@ -144,7 +145,9 @@ public class MQTTSecurityCRLTest extends ActiveMQTestBase {
Message message1 = connection1.receive(5, TimeUnit.SECONDS);
assertEquals(payload1, new String(message1.getPayload()));
fail("We expect an exception of some sort!");
} catch (SSLException expected) {
} catch (EOFException canHappenAlso) {
} finally {
if (connection1 != null) {
connection1.disconnect();

View File

@ -17,16 +17,36 @@
package org.apache.activemq.artemis.tests.smoke.replicationflow;
import javax.jms.BytesMessage;
import javax.jms.CompletionListener;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionFactory;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -35,6 +55,9 @@ import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.utils.RetryRule;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@ -71,7 +94,7 @@ public class SoakPagingTest extends SmokeTestBase {
@Parameterized.Parameters(name = "protocol={0}, type={1}, tx={2}")
public static Collection<Object[]> getParams() {
return Arrays.asList(new Object[][]{{"AMQP", "shared", false}, {"AMQP", "queue", false}, {"OPENWIRE", "topic", false}, {"OPENWIRE", "queue", false}, {"CORE", "shared", false}, {"CORE", "queue", false},
return Arrays.asList(new Object[][]{{"MQTT", "topic", false}, {"AMQP", "shared", false}, {"AMQP", "queue", false}, {"OPENWIRE", "topic", false}, {"OPENWIRE", "queue", false}, {"CORE", "shared", false}, {"CORE", "queue", false},
{"AMQP", "shared", true}, {"AMQP", "queue", true}, {"OPENWIRE", "topic", true}, {"OPENWIRE", "queue", true}, {"CORE", "shared", true}, {"CORE", "queue", true}});
}
@ -99,6 +122,8 @@ public class SoakPagingTest extends SmokeTestBase {
private static ConnectionFactory createConnectionFactory(String protocol, String uri) {
if (protocol.toUpperCase().equals("OPENWIRE")) {
return new org.apache.activemq.ActiveMQConnectionFactory("failover:(" + uri + ")");
} else if (protocol.toUpperCase().equals("MQTT")) {
return new MQTTCF();
} else if (protocol.toUpperCase().equals("AMQP")) {
if (uri.startsWith("tcp://")) {
@ -314,3 +339,761 @@ public class SoakPagingTest extends SmokeTestBase {
}
}
}
class MQTTCF implements ConnectionFactory, Connection, Session, Topic, MessageConsumer, MessageProducer {
final MQTT mqtt = new MQTT();
private String topicName;
private BlockingConnection blockingConnection;
private boolean consumer;
MQTTCF() {
try {
mqtt.setHost("localhost", 61616);
} catch (Exception ignored) {
}
}
@Override
public Connection createConnection() throws JMSException {
return new MQTTCF();
}
@Override
public Connection createConnection(String userName, String password) throws JMSException {
MQTTCF result = new MQTTCF();
result.mqtt.setUserName(userName);
result.mqtt.setPassword(password);
return result;
}
@Override
public JMSContext createContext() {
return null;
}
@Override
public JMSContext createContext(int sessionMode) {
return null;
}
@Override
public JMSContext createContext(String userName, String password) {
return null;
}
@Override
public JMSContext createContext(String userName, String password, int sessionMode) {
return null;
}
@Override
public String getMessageSelector() throws JMSException {
return null;
}
@Override
public Message receive() throws JMSException {
return null;
}
@Override
public Message receive(long timeout) throws JMSException {
final org.fusesource.mqtt.client.Message message;
try {
message = blockingConnection.receive(timeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new JMSException(e.getMessage());
}
if (message != null) {
return new TMessage(new String(message.getPayload()));
}
return null;
}
@Override
public Message receiveNoWait() throws JMSException {
return null;
}
@Override
public void setDisableMessageID(boolean value) throws JMSException {
}
@Override
public boolean getDisableMessageID() throws JMSException {
return false;
}
@Override
public void setDisableMessageTimestamp(boolean value) throws JMSException {
}
@Override
public boolean getDisableMessageTimestamp() throws JMSException {
return false;
}
@Override
public void setDeliveryMode(int deliveryMode) throws JMSException {
}
@Override
public int getDeliveryMode() throws JMSException {
return 0;
}
@Override
public void setPriority(int defaultPriority) throws JMSException {
}
@Override
public int getPriority() throws JMSException {
return 0;
}
@Override
public void setTimeToLive(long timeToLive) throws JMSException {
}
@Override
public long getTimeToLive() throws JMSException {
return 0;
}
@Override
public Destination getDestination() throws JMSException {
return null;
}
@Override
public void send(Message message) throws JMSException {
try {
blockingConnection.publish(topicName, message.getBody(String.class).getBytes(), QoS.EXACTLY_ONCE, false);
} catch (Exception e) {
throw new JMSException(e.getMessage());
}
}
@Override
public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
}
@Override
public void send(Destination destination, Message message) throws JMSException {
}
@Override
public void send(Message message, CompletionListener completionListener) throws JMSException {
}
@Override
public void send(Destination destination,
Message message,
int deliveryMode,
int priority,
long timeToLive) throws JMSException {
}
@Override
public void send(Destination destination,
Message message,
CompletionListener completionListener) throws JMSException {
}
@Override
public void send(Message message,
int deliveryMode,
int priority,
long timeToLive,
CompletionListener completionListener) throws JMSException {
}
@Override
public void send(Destination destination,
Message message,
int deliveryMode,
int priority,
long timeToLive,
CompletionListener completionListener) throws JMSException {
}
@Override
public long getDeliveryDelay() throws JMSException {
return 0;
}
@Override
public void setDeliveryDelay(long deliveryDelay) throws JMSException {
}
@Override
public BytesMessage createBytesMessage() throws JMSException {
return null;
}
@Override
public MapMessage createMapMessage() throws JMSException {
return null;
}
@Override
public Message createMessage() throws JMSException {
return null;
}
@Override
public ObjectMessage createObjectMessage() throws JMSException {
return null;
}
@Override
public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
return null;
}
@Override
public StreamMessage createStreamMessage() throws JMSException {
return null;
}
@Override
public TextMessage createTextMessage() throws JMSException {
return null;
}
@Override
public TextMessage createTextMessage(String text) throws JMSException {
return new TMessage(text);
}
@Override
public boolean getTransacted() throws JMSException {
return false;
}
@Override
public int getAcknowledgeMode() throws JMSException {
return 0;
}
@Override
public void commit() throws JMSException {
}
@Override
public void rollback() throws JMSException {
}
@Override
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
return this;
}
@Override
public Session createSession(int sessionMode) throws JMSException {
return null;
}
@Override
public Session createSession() throws JMSException {
return null;
}
@Override
public String getClientID() throws JMSException {
return null;
}
@Override
public void setClientID(String clientID) throws JMSException {
}
@Override
public ConnectionMetaData getMetaData() throws JMSException {
return null;
}
@Override
public ExceptionListener getExceptionListener() throws JMSException {
return null;
}
@Override
public void setExceptionListener(ExceptionListener listener) throws JMSException {
}
@Override
public void start() throws JMSException {
blockingConnection = mqtt.blockingConnection();
try {
blockingConnection.connect();
if (consumer) {
blockingConnection.subscribe(new org.fusesource.mqtt.client.Topic[]{new org.fusesource.mqtt.client.Topic(topicName, QoS.EXACTLY_ONCE)});
}
} catch (Exception e) {
throw new JMSException(e.getMessage());
}
}
@Override
public void stop() throws JMSException {
}
@Override
public void close() throws JMSException {
}
@Override
public ConnectionConsumer createConnectionConsumer(Destination destination,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
return null;
}
@Override
public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
String subscriptionName,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
return null;
}
@Override
public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic,
String subscriptionName,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
return null;
}
@Override
public ConnectionConsumer createSharedConnectionConsumer(Topic topic,
String subscriptionName,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
return null;
}
@Override
public void recover() throws JMSException {
}
@Override
public MessageListener getMessageListener() throws JMSException {
return null;
}
@Override
public void setMessageListener(MessageListener listener) throws JMSException {
}
@Override
public void run() {
}
@Override
public MessageProducer createProducer(Destination destination) throws JMSException {
return this;
}
@Override
public MessageConsumer createConsumer(Destination destination) throws JMSException {
consumer = true;
return this;
}
@Override
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
return null;
}
@Override
public MessageConsumer createConsumer(Destination destination,
String messageSelector,
boolean NoLocal) throws JMSException {
return null;
}
@Override
public Queue createQueue(String queueName) throws JMSException {
return null;
}
@Override
public Topic createTopic(String topicName) throws JMSException {
this.topicName = topicName;
return this;
}
@Override
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
return null;
}
@Override
public TopicSubscriber createDurableSubscriber(Topic topic,
String name,
String messageSelector,
boolean noLocal) throws JMSException {
return null;
}
@Override
public QueueBrowser createBrowser(Queue queue) throws JMSException {
return null;
}
@Override
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
return null;
}
@Override
public TemporaryQueue createTemporaryQueue() throws JMSException {
return null;
}
@Override
public TemporaryTopic createTemporaryTopic() throws JMSException {
return null;
}
@Override
public void unsubscribe(String name) throws JMSException {
}
@Override
public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException {
return null;
}
@Override
public MessageConsumer createSharedConsumer(Topic topic,
String sharedSubscriptionName,
String messageSelector) throws JMSException {
return null;
}
@Override
public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException {
return null;
}
@Override
public MessageConsumer createDurableConsumer(Topic topic,
String name,
String messageSelector,
boolean noLocal) throws JMSException {
return null;
}
@Override
public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException {
return null;
}
@Override
public MessageConsumer createSharedDurableConsumer(Topic topic,
String name,
String messageSelector) throws JMSException {
return null;
}
@Override
public String getTopicName() throws JMSException {
return topicName;
}
private class TMessage implements Message, TextMessage {
final String s;
TMessage(String s) {
this.s = s;
}
@Override
public String getJMSMessageID() throws JMSException {
return null;
}
@Override
public void setJMSMessageID(String id) throws JMSException {
}
@Override
public long getJMSTimestamp() throws JMSException {
return 0;
}
@Override
public void setJMSTimestamp(long timestamp) throws JMSException {
}
@Override
public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
return new byte[0];
}
@Override
public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException {
}
@Override
public void setJMSCorrelationID(String correlationID) throws JMSException {
}
@Override
public String getJMSCorrelationID() throws JMSException {
return null;
}
@Override
public Destination getJMSReplyTo() throws JMSException {
return null;
}
@Override
public void setJMSReplyTo(Destination replyTo) throws JMSException {
}
@Override
public Destination getJMSDestination() throws JMSException {
return null;
}
@Override
public void setJMSDestination(Destination destination) throws JMSException {
}
@Override
public int getJMSDeliveryMode() throws JMSException {
return 0;
}
@Override
public void setJMSDeliveryMode(int deliveryMode) throws JMSException {
}
@Override
public boolean getJMSRedelivered() throws JMSException {
return false;
}
@Override
public void setJMSRedelivered(boolean redelivered) throws JMSException {
}
@Override
public String getJMSType() throws JMSException {
return null;
}
@Override
public void setJMSType(String type) throws JMSException {
}
@Override
public long getJMSExpiration() throws JMSException {
return 0;
}
@Override
public void setJMSExpiration(long expiration) throws JMSException {
}
@Override
public int getJMSPriority() throws JMSException {
return 0;
}
@Override
public void setJMSPriority(int priority) throws JMSException {
}
@Override
public void clearProperties() throws JMSException {
}
@Override
public boolean propertyExists(String name) throws JMSException {
return false;
}
@Override
public boolean getBooleanProperty(String name) throws JMSException {
return false;
}
@Override
public byte getByteProperty(String name) throws JMSException {
return 0;
}
@Override
public short getShortProperty(String name) throws JMSException {
return 0;
}
@Override
public int getIntProperty(String name) throws JMSException {
return 0;
}
@Override
public long getLongProperty(String name) throws JMSException {
return 0;
}
@Override
public float getFloatProperty(String name) throws JMSException {
return 0;
}
@Override
public double getDoubleProperty(String name) throws JMSException {
return 0;
}
@Override
public String getStringProperty(String name) throws JMSException {
return null;
}
@Override
public Object getObjectProperty(String name) throws JMSException {
return null;
}
@Override
public Enumeration getPropertyNames() throws JMSException {
return null;
}
@Override
public void setBooleanProperty(String name, boolean value) throws JMSException {
}
@Override
public void setByteProperty(String name, byte value) throws JMSException {
}
@Override
public void setShortProperty(String name, short value) throws JMSException {
}
@Override
public void setIntProperty(String name, int value) throws JMSException {
}
@Override
public void setLongProperty(String name, long value) throws JMSException {
}
@Override
public void setFloatProperty(String name, float value) throws JMSException {
}
@Override
public void setDoubleProperty(String name, double value) throws JMSException {
}
@Override
public void setStringProperty(String name, String value) throws JMSException {
}
@Override
public void setObjectProperty(String name, Object value) throws JMSException {
}
@Override
public void acknowledge() throws JMSException {
}
@Override
public void clearBody() throws JMSException {
}
@Override
public long getJMSDeliveryTime() throws JMSException {
return 0;
}
@Override
public void setJMSDeliveryTime(long deliveryTime) throws JMSException {
}
@Override
public <T> T getBody(Class<T> c) throws JMSException {
return (T) s;
}
@Override
public boolean isBodyAssignableTo(Class c) throws JMSException {
return false;
}
@Override
public void setText(String string) throws JMSException {
}
@Override
public String getText() throws JMSException {
return s;
}
}
}