This commit is contained in:
Dejan Bosanac 2016-04-19 16:01:45 +02:00
parent b027e65553
commit 34eb1ee959
3 changed files with 20 additions and 7 deletions

View File

@ -18,11 +18,13 @@ package org.apache.activemq.transport.ws;
import java.io.IOException; import java.io.IOException;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.jms.pool.IntrospectionSupport;
import org.apache.activemq.transport.TransportSupport; import org.apache.activemq.transport.TransportSupport;
import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor; import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor;
import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
@ -42,6 +44,7 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT
protected volatile int receiveCounter; protected volatile int receiveCounter;
protected final String remoteAddress; protected final String remoteAddress;
protected X509Certificate[] peerCertificates; protected X509Certificate[] peerCertificates;
private Map<String, Object> transportOptions;
public AbstractMQTTSocket(String remoteAddress) { public AbstractMQTTSocket(String remoteAddress) {
super(); super();
@ -132,14 +135,18 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT
synchronized(this) { synchronized(this) {
if (protocolConverter == null) { if (protocolConverter == null) {
protocolConverter = new MQTTProtocolConverter(this, brokerService); protocolConverter = new MQTTProtocolConverter(this, brokerService);
IntrospectionSupport.setProperties(protocolConverter, transportOptions);
} }
} }
} }
return protocolConverter; return protocolConverter;
} }
protected boolean transportStartedAtLeastOnce() { protected boolean transportStartedAtLeastOnce() {
return socketTransportStarted.getCount() == 0; return socketTransportStarted.getCount() == 0;
} }
public void setTransportOptions(Map<String, Object> transportOptions) {
this.transportOptions = transportOptions;
}
} }

View File

@ -105,7 +105,9 @@ public class WSTransportServer extends WebTransportServerSupport {
} }
private Servlet createWSServlet() throws Exception { private Servlet createWSServlet() throws Exception {
return new WSServlet(); WSServlet servlet = new WSServlet();
servlet.setTransportOptions(transportOptions);
return servlet;
} }
private int getConnectorLocalPort() throws Exception { private int getConnectorLocalPort() throws Exception {

View File

@ -18,17 +18,14 @@
package org.apache.activemq.transport.ws.jetty9; package org.apache.activemq.transport.ws.jetty9;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.*;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.apache.activemq.jms.pool.IntrospectionSupport;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener; import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.util.HttpTransportUtils; import org.apache.activemq.transport.util.HttpTransportUtils;
@ -51,6 +48,8 @@ public class WSServlet extends WebSocketServlet {
private final static Map<String, Integer> stompProtocols = new ConcurrentHashMap<> (); private final static Map<String, Integer> stompProtocols = new ConcurrentHashMap<> ();
private final static Map<String, Integer> mqttProtocols = new ConcurrentHashMap<> (); private final static Map<String, Integer> mqttProtocols = new ConcurrentHashMap<> ();
private Map<String, Object> transportOptions;
static { static {
stompProtocols.put("v12.stomp", 3); stompProtocols.put("v12.stomp", 3);
stompProtocols.put("v11.stomp", 2); stompProtocols.put("v11.stomp", 2);
@ -90,6 +89,7 @@ public class WSServlet extends WebSocketServlet {
if (isMqtt) { if (isMqtt) {
socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest())); socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
resp.setAcceptedSubProtocol(getAcceptedSubProtocol(mqttProtocols,req.getSubProtocols(), "mqtt")); resp.setAcceptedSubProtocol(getAcceptedSubProtocol(mqttProtocols,req.getSubProtocols(), "mqtt"));
((MQTTSocket)socket).setTransportOptions(new HashMap(transportOptions));
((MQTTSocket)socket).setPeerCertificates(req.getCertificates()); ((MQTTSocket)socket).setPeerCertificates(req.getCertificates());
} else { } else {
socket = new StompSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest())); socket = new StompSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
@ -136,4 +136,8 @@ public class WSServlet extends WebSocketServlet {
this.priority = priority; this.priority = priority;
} }
} }
public void setTransportOptions(Map<String, Object> transportOptions) {
this.transportOptions = transportOptions;
}
} }