diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java index b9dfaba51b..f13b53738d 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java @@ -38,7 +38,7 @@ public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory { @Override protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { - return new TcpTransportServer(this, location, serverSocketFactory) { + TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory) { protected Transport createTransport(Socket socket, WireFormat format) throws IOException { MQTTNIOSSLTransport transport = new MQTTNIOSSLTransport(format, socket); if (context != null) { @@ -47,6 +47,8 @@ public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory { return transport; } }; + result.setAllowLinkStealing(true); + return result; } @Override diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java index f18e900387..52fa228a2b 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java @@ -49,11 +49,13 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok } protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { - return new TcpTransportServer(this, location, serverSocketFactory) { + TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory) { protected Transport createTransport(Socket socket, WireFormat format) throws IOException { return new MQTTNIOTransport(format, socket); } }; + result.setAllowLinkStealing(true); + return result; } protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java index de50cf215c..7b4696a026 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java @@ -16,6 +16,10 @@ */ package org.apache.activemq.transport.mqtt; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; @@ -25,9 +29,12 @@ import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.transport.MutexTransport; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransportServer; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.wireformat.WireFormat; +import javax.net.ServerSocketFactory; + /** * A MQTT transport factory */ @@ -39,6 +46,12 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS return "mqtt"; } + protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory); + result.setAllowLinkStealing(true); + return result; + } + @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { transport = new MQTTTransportFilter(transport, format, brokerContext);