Fixing mqtt link stealing default setting for auto transport
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-09-07 08:17:34 -04:00
parent 9ab94883a8
commit 88af1c70d9
2 changed files with 30 additions and 6 deletions

View File

@ -32,6 +32,8 @@ import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.SslTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
@ -97,14 +99,24 @@ public class AutoSslTransportFactory extends SslTransportFactory implements Brok
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format)
throws IOException {
if (format.getClass().toString().contains("MQTT") && !allowLinkStealingSet) {
this.setAllowLinkStealing(true);
}
setDefaultLinkStealing(format, this);
return super.createTransport(socket, format);
}
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format,
TcpTransportFactory detectedTransportFactory) throws IOException {
setDefaultLinkStealing(format, this);
return super.createTransport(socket, format, detectedTransportFactory);
}
};
return server;
}
private void setDefaultLinkStealing(WireFormat format, TcpTransportServer server) {
if (format.getClass().toString().contains("MQTT") && !allowLinkStealingSet) {
server.setAllowLinkStealing(true);
}
}
}

View File

@ -32,6 +32,7 @@ import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
@ -89,14 +90,25 @@ public class AutoTcpTransportFactory extends TcpTransportFactory implements Brok
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format)
throws IOException {
if (format.getClass().toString().contains("MQTT") && !allowLinkStealingSet) {
this.setAllowLinkStealing(true);
}
setDefaultLinkStealing(format, this);
return super.createTransport(socket, format);
}
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format,
TcpTransportFactory detectedTransportFactory) throws IOException {
setDefaultLinkStealing(format, this);
return super.createTransport(socket, format, detectedTransportFactory);
}
};
return server;
}
private void setDefaultLinkStealing(WireFormat format, TcpTransportServer server) {
if (format.getClass().toString().contains("MQTT") && !allowLinkStealingSet) {
server.setAllowLinkStealing(true);
}
}
}