diff --git a/activemq-http/pom.xml b/activemq-http/pom.xml
index d1b6c0207a..96a22e15dc 100755
--- a/activemq-http/pom.xml
+++ b/activemq-http/pom.xml
@@ -108,6 +108,13 @@
2.25.0
test
+
+ org.eclipse.jetty.websocket
+ websocket-server
+ ${jetty9-version}
+ provided
+ true
+
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java
index 6c98cace6b..3ac922a1b1 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java
@@ -23,8 +23,6 @@ import org.apache.activemq.transport.https.Krb5AndCertsSslSocketConnector;
import org.apache.activemq.util.IntrospectionSupport;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ssl.SslConnector;
-import org.eclipse.jetty.server.ssl.SslSelectChannelConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class SecureSocketConnectorFactory extends SocketConnectorFactory {
@@ -44,76 +42,101 @@ public class SecureSocketConnectorFactory extends SocketConnectorFactory {
private String auth;
private SslContext context;
-
+ private SslContextFactory contextFactory;
+
+ public SecureSocketConnectorFactory() {
+
+ }
public SecureSocketConnectorFactory(SslContext context) {
this.context = context;
}
+ public SecureSocketConnectorFactory(SslContextFactory contextFactory) {
+ this.contextFactory = contextFactory;
+ }
+
@Override
public Connector createConnector(Server server) throws Exception {
- IntrospectionSupport.setProperties(this, getTransportOptions());
- SslConnector sslConnector;
- if (Krb5AndCertsSslSocketConnector.isKrb(auth)) {
- sslConnector = new Krb5AndCertsSslSocketConnector();
- ((Krb5AndCertsSslSocketConnector)sslConnector).setMode(auth);
- } else {
- sslConnector = new SslSelectChannelConnector();
+ if (getTransportOptions() != null) {
+ IntrospectionSupport.setProperties(this, getTransportOptions());
}
SSLContext sslContext = context == null ? null : context.getSSLContext();
// Get a reference to the current ssl context factory...
- SslContextFactory factory = sslConnector.getSslContextFactory();
- if (context != null) {
-
- // Should not be using this method since it does not use all of the values
- // from the passed SslContext instance.....
- factory.setSslContext(sslContext);
+ SslContextFactory factory;
+ if (contextFactory == null) {
+ factory = new SslContextFactory();
+ if (context != null) {
+ // Should not be using this method since it does not use all of the values
+ // from the passed SslContext instance.....
+ factory.setSslContext(sslContext);
+ } else {
+ if (keyStore != null) {
+ factory.setKeyStorePath(keyStore);
+ }
+ if (keyStorePassword != null) {
+ factory.setKeyStorePassword(keyStorePassword);
+ }
+ // if the keyPassword hasn't been set, default it to the
+ // key store password
+ if (keyPassword == null && keyStorePassword != null) {
+ factory.setKeyStorePassword(keyStorePassword);
+ }
+ if (keyStoreType != null) {
+ factory.setKeyStoreType(keyStoreType);
+ }
+ if (secureRandomCertficateAlgorithm != null) {
+ factory.setSecureRandomAlgorithm(secureRandomCertficateAlgorithm);
+ }
+ if (keyCertificateAlgorithm != null) {
+ factory.setSslKeyManagerFactoryAlgorithm(keyCertificateAlgorithm);
+ }
+ if (trustCertificateAlgorithm != null) {
+ factory.setTrustManagerFactoryAlgorithm(trustCertificateAlgorithm);
+ }
+ if (protocol != null) {
+ factory.setProtocol(protocol);
+ }
+ if (trustStore != null) {
+ setTrustStore(factory, trustStore);
+ }
+ if (trustStorePassword != null) {
+ factory.setTrustStorePassword(trustStorePassword);
+ }
+ }
+ factory.setNeedClientAuth(needClientAuth);
+ factory.setWantClientAuth(wantClientAuth);
} else {
-
- if (keyStore != null) {
- factory.setKeyStorePath(keyStore);
- }
- if (keyStorePassword != null) {
- factory.setKeyStorePassword(keyStorePassword);
- }
- // if the keyPassword hasn't been set, default it to the
- // key store password
- if (keyPassword == null && keyStorePassword != null) {
- factory.setKeyStorePassword(keyStorePassword);
- }
- if (keyStoreType != null) {
- factory.setKeyStoreType(keyStoreType);
- }
- if (secureRandomCertficateAlgorithm != null) {
- factory.setSecureRandomAlgorithm(secureRandomCertficateAlgorithm);
- }
- if (keyCertificateAlgorithm != null) {
- factory.setSslKeyManagerFactoryAlgorithm(keyCertificateAlgorithm);
- }
- if (trustCertificateAlgorithm != null) {
- factory.setTrustManagerFactoryAlgorithm(trustCertificateAlgorithm);
- }
- if (protocol != null) {
- factory.setProtocol(protocol);
- }
- if (trustStore != null) {
- factory.setTrustStore(trustStore);
- }
- if (trustStorePassword != null) {
- factory.setTrustStorePassword(trustStorePassword);
- }
-
+ factory = contextFactory;
}
- factory.setNeedClientAuth(needClientAuth);
- factory.setWantClientAuth(wantClientAuth);
-
- return sslConnector;
+
+ if ("KRB".equals(auth) || "BOTH".equals(auth)
+ && Server.getVersion().startsWith("8")) {
+ return new Krb5AndCertsSslSocketConnector(factory, auth);
+ } else {
+ try {
+ Class> cls = Class.forName("org.eclipse.jetty.server.ssl.SslSelectChannelConnector", true, Server.class.getClassLoader());
+ return (Connector)cls.getConstructor(SslContextFactory.class).newInstance(factory);
+ } catch (Throwable t) {
+ Class> c = Class.forName("org.eclipse.jetty.server.ServerConnector", true, Server.class.getClassLoader());
+ Connector connector = (Connector)c.getConstructor(Server.class, SslContextFactory.class).newInstance(server, factory);
+ Server.class.getMethod("setStopTimeout", Long.TYPE).invoke(server, 500);
+ connector.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(connector, 500);
+ return connector;
+ }
+ }
+ }
+ private void setTrustStore(SslContextFactory factory, String trustStore2) throws Exception {
+ String mname = Server.getVersion().startsWith("8") ? "setTrustStore" : "setTrustStorePath";
+ factory.getClass().getMethod(mname, String.class).invoke(factory, trustStore2);
}
+
+
// Properties
// --------------------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java
index 36b800b4ec..b982f18e6c 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java
@@ -21,15 +21,26 @@ import java.util.Map;
import org.apache.activemq.util.IntrospectionSupport;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.nio.SelectChannelConnector;
public class SocketConnectorFactory {
private Map transportOptions;
public Connector createConnector(Server server) throws Exception {
- SelectChannelConnector connector = new SelectChannelConnector();
- IntrospectionSupport.setProperties(connector, transportOptions, "");
+ Connector connector = null;
+
+ try {
+ connector = (Connector)Class.forName("org.eclipse.jetty.server.nio.SelectChannelConnector", true, Server.class.getClassLoader()).newInstance();
+ } catch (Throwable t) {
+ Class> c = Class.forName("org.eclipse.jetty.server.ServerConnector", true, Server.class.getClassLoader());
+ connector = (Connector)c.getConstructor(Server.class).newInstance(server);
+ Server.class.getMethod("setStopTimeout", Long.TYPE).invoke(server, 500);
+ connector.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(connector, 500);
+ }
+ System.out.println(transportOptions);
+ if (transportOptions != null) {
+ IntrospectionSupport.setProperties(connector, transportOptions, "");
+ }
return connector;
}
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java b/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java
index 28c11a6419..a52424e623 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java
@@ -35,6 +35,18 @@ abstract public class WebTransportServerSupport extends TransportServerSupport {
super(location);
}
+ private void setConnectorProperty(String name, Class type, T value) throws Exception {
+ connector.getClass().getMethod("set" + name, type).invoke(connector, value);
+ }
+
+ protected void createServer() {
+ server = new Server();
+ try {
+ server.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(server, 500l);
+ } catch (Throwable t) {
+ //ignore, jetty 8.
+ }
+ }
public URI bind() throws Exception {
URI bind = getBindLocation();
@@ -44,9 +56,11 @@ abstract public class WebTransportServerSupport extends TransportServerSupport {
InetAddress addr = InetAddress.getByName(bindHost);
host = addr.getCanonicalHostName();
- connector.setHost(host);
- connector.setPort(bindAddress.getPort());
- connector.setServer(server);
+ setConnectorProperty("Host", String.class, host);
+ setConnectorProperty("Port", Integer.TYPE, bindAddress.getPort());
+ if (Server.getVersion().startsWith("8")) {
+ connector.setServer(server);
+ }
server.addConnector(connector);
if (addr.isAnyLocalAddress()) {
host = InetAddressUtil.getLocalHostName();
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java
index 96389ec7be..786e3aac8d 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java
@@ -19,7 +19,6 @@ package org.apache.activemq.transport.discovery.http;
import java.net.URI;
import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
@@ -27,13 +26,16 @@ public class EmbeddedJettyServer implements org.apache.activemq.Service {
private HTTPDiscoveryAgent agent;
private Server server;
- private SelectChannelConnector connector;
private DiscoveryRegistryServlet camelServlet = new DiscoveryRegistryServlet();
public void start() throws Exception {
URI uri = new URI(agent.getRegistryURL());
- server = new Server();
+ int port = 80;
+ if( uri.getPort() >=0 ) {
+ port = uri.getPort();
+ }
+ server = new Server(port);
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SECURITY | ServletContextHandler.NO_SESSIONS);
context.setContextPath("/");
@@ -42,23 +44,9 @@ public class EmbeddedJettyServer implements org.apache.activemq.Service {
context.addServlet(holder, "/*");
server.setHandler(context);
server.start();
-
- int port = 80;
- if( uri.getPort() >=0 ) {
- port = uri.getPort();
- }
-
- connector = new SelectChannelConnector();
- connector.setPort(port);
- server.addConnector(connector);
- connector.start();
}
public void stop() throws Exception {
- if( connector!=null ) {
- connector.stop();
- connector = null;
- }
if( server!=null ) {
server.stop();
server = null;
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
index 0c7ecd9cfe..8ae7874d9d 100755
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
@@ -27,8 +27,8 @@ import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.transport.xstream.XStreamWireFormat;
import org.apache.activemq.util.ServiceStopper;
import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.handler.GzipHandler;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
@@ -77,7 +77,7 @@ public class HttpTransportServer extends WebTransportServerSupport {
@Override
protected void doStart() throws Exception {
- server = new Server();
+ createServer();
if (connector == null) {
connector = socketConnectorFactory.createConnector(server);
}
@@ -96,8 +96,7 @@ public class HttpTransportServer extends WebTransportServerSupport {
contextHandler.setAttribute("transportFactory", transportFactory);
contextHandler.setAttribute("transportOptions", transportOptions);
- GzipHandler gzipHandler = new GzipHandler();
- contextHandler.setHandler(gzipHandler);
+ addGzipHandler(contextHandler);
server.start();
@@ -105,8 +104,9 @@ public class HttpTransportServer extends WebTransportServerSupport {
// was set to zero so that we report the actual port we are listening on.
int port = boundTo.getPort();
- if (connector.getLocalPort() != -1) {
- port = connector.getLocalPort();
+ int p2 = getConnectorLocalPort();
+ if (p2 != -1) {
+ port = p2;
}
setConnectURI(new URI(boundTo.getScheme(),
@@ -118,6 +118,19 @@ public class HttpTransportServer extends WebTransportServerSupport {
boundTo.getFragment()));
}
+ private int getConnectorLocalPort() throws Exception {
+ return (Integer)connector.getClass().getMethod("getLocalPort").invoke(connector);
+ }
+ private void addGzipHandler(ServletContextHandler contextHandler) throws Exception {
+ Handler handler = null;
+ try {
+ handler = (Handler)Class.forName("org.eclipse.jetty.server.handler.GzipHandler", true, Handler.class.getClassLoader()).newInstance();
+ } catch (Throwable t) {
+ handler = (Handler)Class.forName("org.eclipse.jetty.servlets.gzip.GzipHandler", true, Handler.class.getClassLoader()).newInstance();
+ }
+ contextHandler.setHandler(handler);
+ }
+
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
Server temp = server;
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java b/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java
index 858c9adb8f..cf3612294a 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java
@@ -68,6 +68,14 @@ public class Krb5AndCertsSslSocketConnector extends SslSocketConnector {
useCerts = true;
setPasswords();
}
+ public Krb5AndCertsSslSocketConnector(SslContextFactory f, String auth) {
+ // By default, stick to cert based authentication
+ super(f);
+ useKrb = false;
+ useCerts = true;
+ setPasswords();
+ setMode(auth);
+ }
public static boolean isKrb(String mode) {
return mode == MODE.KRB.toString() || mode == MODE.BOTH.toString();
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
index 4b75c9a3de..e26027deef 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java
@@ -49,7 +49,7 @@ public class WSTransportServer extends WebTransportServerSupport {
@Override
protected void doStart() throws Exception {
- server = new Server();
+ createServer();
if (connector == null) {
connector = socketConnectorFactory.createConnector(server);
@@ -69,7 +69,11 @@ public class WSTransportServer extends WebTransportServerSupport {
}
}
- holder.setServlet(new WSServlet());
+ if (Server.getVersion().startsWith("8")) {
+ holder.setServlet(new org.apache.activemq.transport.ws.jetty8.WSServlet());
+ } else {
+ holder.setServlet(new org.apache.activemq.transport.ws.jetty9.WSServlet());
+ }
contextHandler.addServlet(holder, "/");
contextHandler.setAttribute("acceptListener", getAcceptListener());
@@ -79,9 +83,9 @@ public class WSTransportServer extends WebTransportServerSupport {
// Update the Connect To URI with our actual location in case the configured port
// was set to zero so that we report the actual port we are listening on.
- int port = boundTo.getPort();
- if (connector.getLocalPort() != -1) {
- port = connector.getLocalPort();
+ int port = getConnectorLocalPort();
+ if (port == -1) {
+ port = boundTo.getPort();
}
setConnectURI(new URI(boundTo.getScheme(),
@@ -95,6 +99,10 @@ public class WSTransportServer extends WebTransportServerSupport {
LOG.info("Listening for connections at {}", getConnectURI());
}
+ private int getConnectorLocalPort() throws Exception {
+ return (Integer)connector.getClass().getMethod("getLocalPort").invoke(connector);
+ }
+
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
Server temp = server;
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/MQTTSocket.java
similarity index 99%
rename from activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java
rename to activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/MQTTSocket.java
index 047c459ddb..58e9134f42 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/MQTTSocket.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.ws;
+package org.apache.activemq.transport.ws.jetty8;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
similarity index 98%
rename from activemq-http/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
rename to activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
index b0da09aea3..dba3ca98c9 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport.ws;
+package org.apache.activemq.transport.ws.jetty8;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/WSServlet.java
similarity index 97%
rename from activemq-http/src/main/java/org/apache/activemq/transport/ws/WSServlet.java
rename to activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/WSServlet.java
index d0ed22da92..d0f7b19d93 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSServlet.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/WSServlet.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.activemq.transport.ws;
+package org.apache.activemq.transport.ws.jetty8;
import java.io.IOException;
import javax.servlet.ServletException;
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
new file mode 100644
index 0000000000..4d7dac3a2e
--- /dev/null
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws.jetty9;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.TransportSupport;
+import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor;
+import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
+import org.apache.activemq.transport.mqtt.MQTTTransport;
+import org.apache.activemq.transport.mqtt.MQTTWireFormat;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.fusesource.mqtt.codec.DISCONNECT;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.CountDownLatch;
+
+public class MQTTSocket extends TransportSupport implements WebSocketListener, MQTTTransport, BrokerServiceAware {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
+ Session session;
+ MQTTProtocolConverter protocolConverter = null;
+ MQTTWireFormat wireFormat = new MQTTWireFormat();
+ private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
+ private BrokerService brokerService;
+
+ private MQTTProtocolConverter getProtocolConverter() {
+ if( protocolConverter == null ) {
+ protocolConverter = new MQTTProtocolConverter(this, brokerService);
+ }
+ return protocolConverter;
+ }
+
+ protected void doStart() throws Exception {
+ socketTransportStarted.countDown();
+ }
+
+ @Override
+ protected void doStop(ServiceStopper stopper) throws Exception {
+ }
+
+ private boolean transportStartedAtLeastOnce() {
+ return socketTransportStarted.getCount() == 0;
+ }
+
+ @Override
+ public int getReceiveCounter() {
+ return 0;
+ }
+
+ @Override
+ public String getRemoteAddress() {
+ return "MQTTSocket_" + this.hashCode();
+ }
+
+ @Override
+ public void oneway(Object command) throws IOException {
+ try {
+ getProtocolConverter().onActiveMQCommand((Command) command);
+ } catch (Exception e) {
+ onException(IOExceptionSupport.create(e));
+ }
+ }
+
+ @Override
+ public void sendToActiveMQ(Command command) {
+ doConsume(command);
+ }
+
+ @Override
+ public void sendToMQTT(MQTTFrame command) throws IOException {
+ ByteSequence bytes = wireFormat.marshal(command);
+ session.getRemote().sendBytes(ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength()));
+ }
+
+ @Override
+ public X509Certificate[] getPeerCertificates() {
+ return new X509Certificate[0];
+ }
+
+ @Override
+ public MQTTInactivityMonitor getInactivityMonitor() {
+ return null;
+ }
+
+ @Override
+ public MQTTWireFormat getWireFormat() {
+ return wireFormat;
+ }
+
+ @Override
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
+
+ @Override
+ public void onWebSocketBinary(byte[] bytes, int offset, int length) {
+ if (!transportStartedAtLeastOnce()) {
+ LOG.debug("Waiting for StompSocket to be properly started...");
+ try {
+ socketTransportStarted.await();
+ } catch (InterruptedException e) {
+ LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
+ }
+ }
+
+ try {
+ MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
+ getProtocolConverter().onMQTTCommand(frame);
+ } catch (Exception e) {
+ onException(IOExceptionSupport.create(e));
+ }
+ }
+
+ @Override
+ public void onWebSocketClose(int arg0, String arg1) {
+ try {
+ getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
+ } catch (Exception e) {
+ LOG.warn("Failed to close WebSocket", e);
+ }
+ }
+
+ @Override
+ public void onWebSocketConnect(Session session) {
+ this.session = session;
+ }
+
+ @Override
+ public void onWebSocketError(Throwable arg0) {
+
+ }
+
+ @Override
+ public void onWebSocketText(String arg0) {
+ }
+}
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
new file mode 100644
index 0000000000..811f228faf
--- /dev/null
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws.jetty9;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.TransportSupport;
+import org.apache.activemq.transport.stomp.ProtocolConverter;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.transport.stomp.StompInactivityMonitor;
+import org.apache.activemq.transport.stomp.StompTransport;
+import org.apache.activemq.transport.stomp.StompWireFormat;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements web socket and mediates between servlet and the broker
+ */
+class StompSocket extends TransportSupport implements WebSocketListener, StompTransport {
+ private static final Logger LOG = LoggerFactory.getLogger(StompSocket.class);
+
+ Session session;
+ ProtocolConverter protocolConverter = new ProtocolConverter(this, null);
+ StompWireFormat wireFormat = new StompWireFormat();
+ private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
+ private StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this, wireFormat);
+
+ private boolean transportStartedAtLeastOnce() {
+ return socketTransportStarted.getCount() == 0;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ socketTransportStarted.countDown();
+ }
+
+ @Override
+ protected void doStop(ServiceStopper stopper) throws Exception {
+ }
+
+ @Override
+ public int getReceiveCounter() {
+ return 0;
+ }
+
+ @Override
+ public String getRemoteAddress() {
+ return "StompSocket_" + this.hashCode();
+ }
+
+ @Override
+ public void oneway(Object command) throws IOException {
+ try {
+ protocolConverter.onActiveMQCommand((Command)command);
+ } catch (Exception e) {
+ onException(IOExceptionSupport.create(e));
+ }
+ }
+
+ @Override
+ public void sendToActiveMQ(Command command) {
+ doConsume(command);
+ }
+
+ @Override
+ public void sendToStomp(StompFrame command) throws IOException {
+ session.getRemote().sendString(command.format());
+ }
+
+ @Override
+ public StompInactivityMonitor getInactivityMonitor() {
+ return stompInactivityMonitor;
+ }
+
+ @Override
+ public StompWireFormat getWireFormat() {
+ return this.wireFormat;
+ }
+
+ @Override
+ public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) {
+ }
+
+ @Override
+ public void onWebSocketClose(int arg0, String arg1) {
+ try {
+ protocolConverter.onStompCommand(new StompFrame(Stomp.Commands.DISCONNECT));
+ } catch (Exception e) {
+ LOG.warn("Failed to close WebSocket", e);
+ }
+ }
+
+ @Override
+ public void onWebSocketConnect(Session session) {
+ this.session = session;
+ }
+
+ @Override
+ public void onWebSocketError(Throwable arg0) {
+ }
+
+ @Override
+ public void onWebSocketText(String data) {
+ if (!transportStartedAtLeastOnce()) {
+ LOG.debug("Waiting for StompSocket to be properly started...");
+ try {
+ socketTransportStarted.await();
+ } catch (InterruptedException e) {
+ LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
+ }
+ }
+
+ try {
+ protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8"))));
+ } catch (Exception e) {
+ onException(IOExceptionSupport.create(e));
+ }
+ }
+
+}
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
new file mode 100644
index 0000000000..15927b1a98
--- /dev/null
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.ws.jetty9;
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+
+/**
+ * Handle connection upgrade requests and creates web sockets
+ */
+public class WSServlet extends WebSocketServlet {
+ private static final long serialVersionUID = -4716657876092884139L;
+
+ private TransportAcceptListener listener;
+
+ public void init() throws ServletException {
+ super.init();
+ listener = (TransportAcceptListener)getServletContext().getAttribute("acceptListener");
+ if (listener == null) {
+ throw new ServletException("No such attribute 'acceptListener' available in the ServletContext");
+ }
+ }
+
+ protected void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException ,IOException {
+ getServletContext().getNamedDispatcher("default").forward(request,response);
+ }
+
+
+ public void configure(WebSocketServletFactory factory) {
+ factory.setCreator(new WebSocketCreator() {
+ @Override
+ public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
+ WebSocketListener socket;
+ if (req.getSubProtocols().contains("mqtt")) {
+ socket = new MQTTSocket();
+ } else {
+ socket = new StompSocket();
+ }
+ return socket;
+ }
+ });
+
+ }
+}
+
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java
index 92bc1cbb05..140356e846 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java
@@ -32,16 +32,18 @@ import javax.net.ServerSocketFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.spring.SpringSslContext;
+import org.apache.activemq.transport.SocketConnectorFactory;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.util.Wait;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.webapp.WebAppContext;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+
import org.openqa.selenium.By;
import org.openqa.selenium.WebDriver;
import org.openqa.selenium.WebElement;
@@ -97,7 +99,9 @@ public class WSTransportTest {
Server server = new Server();
Connector connector = createJettyConnector(server);
- connector.setServer(server);
+ if (Server.getVersion().startsWith("8")) {
+ connector.setServer(server);
+ }
WebAppContext context = new WebAppContext();
context.setResourceBase("src/test/webapp");
@@ -129,10 +133,10 @@ public class WSTransportTest {
return proxyPort;
}
- protected Connector createJettyConnector(Server server) {
- SelectChannelConnector connector = new SelectChannelConnector();
- connector.setPort(getProxyPort());
- return connector;
+ protected Connector createJettyConnector(Server server) throws Exception {
+ Connector c = new SocketConnectorFactory().createConnector(server);
+ c.getClass().getMethod("setPort", Integer.TYPE).invoke(c, getProxyPort());
+ return c;
}
protected void stopBroker() throws Exception {
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java
index ef6114092f..36b33f6319 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java
@@ -16,23 +16,23 @@
*/
package org.apache.activemq.transport.wss;
+import org.apache.activemq.transport.SecureSocketConnectorFactory;
import org.apache.activemq.transport.ws.WSTransportTest;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ssl.SslSocketConnector;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
public class WSSTransportTest extends WSTransportTest {
@Override
- protected Connector createJettyConnector(Server server) {
- SslSocketConnector sslConnector = new SslSocketConnector();
- SslContextFactory contextFactory = sslConnector.getSslContextFactory();
- contextFactory.setKeyStorePath("src/test/resources/server.keystore");
- contextFactory.setKeyStorePassword("password");
- contextFactory.setTrustStore("src/test/resources/client.keystore");
- contextFactory.setTrustStorePassword("password");
- sslConnector.setPort(getProxyPort());
- return sslConnector;
+ protected Connector createJettyConnector(Server server) throws Exception {
+ SecureSocketConnectorFactory sscf = new SecureSocketConnectorFactory();
+ sscf.setKeyStore("src/test/resources/server.keystore");
+ sscf.setKeyStorePassword("password");
+ sscf.setTrustStore("src/test/resources/client.keystore");
+ sscf.setTrustStorePassword("password");
+
+ Connector c = sscf.createConnector(server);
+ c.getClass().getMethod("setPort", Integer.TYPE).invoke(c, getProxyPort());
+ return c;
}
@Override
diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml
index c56877a555..74be454f3e 100644
--- a/activemq-osgi/pom.xml
+++ b/activemq-osgi/pom.xml
@@ -52,7 +52,7 @@
com.fasterxml.jackson*;resolution:=optional,
org.codehaus.jettison*;resolution:=optional,
org.jasypt*;resolution:=optional,
- org.eclipse.jetty*;resolution:=optional,
+ org.eclipse.jetty*;resolution:=optional;version="[8.1,10)",
org.apache.zookeeper*;resolution:=optional,
org.fusesource.leveldbjni*;resolution:=optional,
org.fusesource.hawtjni*;resolution:=optional,
diff --git a/activemq-web-console/pom.xml b/activemq-web-console/pom.xml
index c14c40f2fb..8002444c26 100755
--- a/activemq-web-console/pom.xml
+++ b/activemq-web-console/pom.xml
@@ -32,6 +32,7 @@
8080
+ org.mortbay.jetty
@@ -51,7 +52,7 @@
- org.mortbay.jetty
+ ${jetty.maven.groupid}
jetty-maven-plugin
${jetty-version}