diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java index 65d12c216f..fe0bd323fd 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java @@ -18,11 +18,13 @@ package org.apache.activemq.transport.ws; import java.io.IOException; import java.security.cert.X509Certificate; +import java.util.Map; import java.util.concurrent.CountDownLatch; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.command.Command; +import org.apache.activemq.jms.pool.IntrospectionSupport; import org.apache.activemq.transport.TransportSupport; import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor; import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; @@ -42,6 +44,7 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT protected volatile int receiveCounter; protected final String remoteAddress; protected X509Certificate[] peerCertificates; + private Map transportOptions; public AbstractMQTTSocket(String remoteAddress) { super(); @@ -132,14 +135,18 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT synchronized(this) { if (protocolConverter == null) { protocolConverter = new MQTTProtocolConverter(this, brokerService); + IntrospectionSupport.setProperties(protocolConverter, transportOptions); } } } - return protocolConverter; } protected boolean transportStartedAtLeastOnce() { return socketTransportStarted.getCount() == 0; } + + public void setTransportOptions(Map transportOptions) { + this.transportOptions = transportOptions; + } } diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java index 51ce1bad94..4110fcbb42 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java @@ -105,7 +105,9 @@ public class WSTransportServer extends WebTransportServerSupport { } private Servlet createWSServlet() throws Exception { - return new WSServlet(); + WSServlet servlet = new WSServlet(); + servlet.setTransportOptions(transportOptions); + return servlet; } private int getConnectorLocalPort() throws Exception { diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java index cbcdc611ce..338be985c0 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java @@ -18,17 +18,14 @@ package org.apache.activemq.transport.ws.jetty9; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.activemq.jms.pool.IntrospectionSupport; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportAcceptListener; import org.apache.activemq.transport.util.HttpTransportUtils; @@ -51,6 +48,8 @@ public class WSServlet extends WebSocketServlet { private final static Map stompProtocols = new ConcurrentHashMap<> (); private final static Map mqttProtocols = new ConcurrentHashMap<> (); + private Map transportOptions; + static { stompProtocols.put("v12.stomp", 3); stompProtocols.put("v11.stomp", 2); @@ -90,6 +89,7 @@ public class WSServlet extends WebSocketServlet { if (isMqtt) { socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest())); resp.setAcceptedSubProtocol(getAcceptedSubProtocol(mqttProtocols,req.getSubProtocols(), "mqtt")); + ((MQTTSocket)socket).setTransportOptions(new HashMap(transportOptions)); ((MQTTSocket)socket).setPeerCertificates(req.getCertificates()); } else { socket = new StompSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest())); @@ -136,4 +136,8 @@ public class WSServlet extends WebSocketServlet { this.priority = priority; } } + + public void setTransportOptions(Map transportOptions) { + this.transportOptions = transportOptions; + } }