This commit is contained in:
Rob Davies 2013-11-12 08:08:42 +00:00
parent 47d1985139
commit d2ddd1dcad
3 changed files with 19 additions and 2 deletions

View File

@ -38,7 +38,7 @@ public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory {
@Override @Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new TcpTransportServer(this, location, serverSocketFactory) { TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory) {
protected Transport createTransport(Socket socket, WireFormat format) throws IOException { protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
MQTTNIOSSLTransport transport = new MQTTNIOSSLTransport(format, socket); MQTTNIOSSLTransport transport = new MQTTNIOSSLTransport(format, socket);
if (context != null) { if (context != null) {
@ -47,6 +47,8 @@ public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory {
return transport; return transport;
} }
}; };
result.setAllowLinkStealing(true);
return result;
} }
@Override @Override

View File

@ -49,11 +49,13 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
} }
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new TcpTransportServer(this, location, serverSocketFactory) { TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory) {
protected Transport createTransport(Socket socket, WireFormat format) throws IOException { protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
return new MQTTNIOTransport(format, socket); return new MQTTNIOTransport(format, socket);
} }
}; };
result.setAllowLinkStealing(true);
return result;
} }
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {

View File

@ -16,6 +16,10 @@
*/ */
package org.apache.activemq.transport.mqtt; package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -25,9 +29,12 @@ import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport; import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
import javax.net.ServerSocketFactory;
/** /**
* A <a href="http://mqtt.org/">MQTT</a> transport factory * A <a href="http://mqtt.org/">MQTT</a> transport factory
*/ */
@ -39,6 +46,12 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS
return "mqtt"; return "mqtt";
} }
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory);
result.setAllowLinkStealing(true);
return result;
}
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new MQTTTransportFilter(transport, format, brokerContext); transport = new MQTTTransportFilter(transport, format, brokerContext);