From 3f826250775c6c022ebad1de57db11770c9cfe1a Mon Sep 17 00:00:00 2001 From: Daniel Kulp Date: Tue, 13 Jan 2015 12:47:53 -0500 Subject: [PATCH] [AMQ-5517] Runtime support for Jetty 9. Build/compile with Jetty8, but tests pass with Jetty 9 for runtime level support. --- activemq-http/pom.xml | 7 + .../SecureSocketConnectorFactory.java | 131 ++++++++------ .../transport/SocketConnectorFactory.java | 17 +- .../transport/WebTransportServerSupport.java | 20 ++- .../discovery/http/EmbeddedJettyServer.java | 22 +-- .../transport/http/HttpTransportServer.java | 25 ++- .../https/Krb5AndCertsSslSocketConnector.java | 8 + .../transport/ws/WSTransportServer.java | 18 +- .../transport/ws/{ => jetty8}/MQTTSocket.java | 2 +- .../ws/{ => jetty8}/StompSocket.java | 2 +- .../transport/ws/{ => jetty8}/WSServlet.java | 2 +- .../transport/ws/jetty9/MQTTSocket.java | 161 ++++++++++++++++++ .../transport/ws/jetty9/StompSocket.java | 142 +++++++++++++++ .../transport/ws/jetty9/WSServlet.java | 72 ++++++++ .../transport/ws/WSTransportTest.java | 16 +- .../transport/wss/WSSTransportTest.java | 22 +-- activemq-osgi/pom.xml | 2 +- activemq-web-console/pom.xml | 3 +- 18 files changed, 562 insertions(+), 110 deletions(-) rename activemq-http/src/main/java/org/apache/activemq/transport/ws/{ => jetty8}/MQTTSocket.java (99%) rename activemq-http/src/main/java/org/apache/activemq/transport/ws/{ => jetty8}/StompSocket.java (98%) rename activemq-http/src/main/java/org/apache/activemq/transport/ws/{ => jetty8}/WSServlet.java (97%) create mode 100644 activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java create mode 100644 activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java create mode 100644 activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java diff --git a/activemq-http/pom.xml b/activemq-http/pom.xml index d1b6c0207a..96a22e15dc 100755 --- a/activemq-http/pom.xml +++ b/activemq-http/pom.xml @@ -108,6 +108,13 @@ 2.25.0 test + + org.eclipse.jetty.websocket + websocket-server + ${jetty9-version} + provided + true + diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java index 6c98cace6b..3ac922a1b1 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java @@ -23,8 +23,6 @@ import org.apache.activemq.transport.https.Krb5AndCertsSslSocketConnector; import org.apache.activemq.util.IntrospectionSupport; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ssl.SslConnector; -import org.eclipse.jetty.server.ssl.SslSelectChannelConnector; import org.eclipse.jetty.util.ssl.SslContextFactory; public class SecureSocketConnectorFactory extends SocketConnectorFactory { @@ -44,76 +42,101 @@ public class SecureSocketConnectorFactory extends SocketConnectorFactory { private String auth; private SslContext context; - + private SslContextFactory contextFactory; + + public SecureSocketConnectorFactory() { + + } public SecureSocketConnectorFactory(SslContext context) { this.context = context; } + public SecureSocketConnectorFactory(SslContextFactory contextFactory) { + this.contextFactory = contextFactory; + } + @Override public Connector createConnector(Server server) throws Exception { - IntrospectionSupport.setProperties(this, getTransportOptions()); - SslConnector sslConnector; - if (Krb5AndCertsSslSocketConnector.isKrb(auth)) { - sslConnector = new Krb5AndCertsSslSocketConnector(); - ((Krb5AndCertsSslSocketConnector)sslConnector).setMode(auth); - } else { - sslConnector = new SslSelectChannelConnector(); + if (getTransportOptions() != null) { + IntrospectionSupport.setProperties(this, getTransportOptions()); } SSLContext sslContext = context == null ? null : context.getSSLContext(); // Get a reference to the current ssl context factory... - SslContextFactory factory = sslConnector.getSslContextFactory(); - if (context != null) { - - // Should not be using this method since it does not use all of the values - // from the passed SslContext instance..... - factory.setSslContext(sslContext); + SslContextFactory factory; + if (contextFactory == null) { + factory = new SslContextFactory(); + if (context != null) { + // Should not be using this method since it does not use all of the values + // from the passed SslContext instance..... + factory.setSslContext(sslContext); + } else { + if (keyStore != null) { + factory.setKeyStorePath(keyStore); + } + if (keyStorePassword != null) { + factory.setKeyStorePassword(keyStorePassword); + } + // if the keyPassword hasn't been set, default it to the + // key store password + if (keyPassword == null && keyStorePassword != null) { + factory.setKeyStorePassword(keyStorePassword); + } + if (keyStoreType != null) { + factory.setKeyStoreType(keyStoreType); + } + if (secureRandomCertficateAlgorithm != null) { + factory.setSecureRandomAlgorithm(secureRandomCertficateAlgorithm); + } + if (keyCertificateAlgorithm != null) { + factory.setSslKeyManagerFactoryAlgorithm(keyCertificateAlgorithm); + } + if (trustCertificateAlgorithm != null) { + factory.setTrustManagerFactoryAlgorithm(trustCertificateAlgorithm); + } + if (protocol != null) { + factory.setProtocol(protocol); + } + if (trustStore != null) { + setTrustStore(factory, trustStore); + } + if (trustStorePassword != null) { + factory.setTrustStorePassword(trustStorePassword); + } + } + factory.setNeedClientAuth(needClientAuth); + factory.setWantClientAuth(wantClientAuth); } else { - - if (keyStore != null) { - factory.setKeyStorePath(keyStore); - } - if (keyStorePassword != null) { - factory.setKeyStorePassword(keyStorePassword); - } - // if the keyPassword hasn't been set, default it to the - // key store password - if (keyPassword == null && keyStorePassword != null) { - factory.setKeyStorePassword(keyStorePassword); - } - if (keyStoreType != null) { - factory.setKeyStoreType(keyStoreType); - } - if (secureRandomCertficateAlgorithm != null) { - factory.setSecureRandomAlgorithm(secureRandomCertficateAlgorithm); - } - if (keyCertificateAlgorithm != null) { - factory.setSslKeyManagerFactoryAlgorithm(keyCertificateAlgorithm); - } - if (trustCertificateAlgorithm != null) { - factory.setTrustManagerFactoryAlgorithm(trustCertificateAlgorithm); - } - if (protocol != null) { - factory.setProtocol(protocol); - } - if (trustStore != null) { - factory.setTrustStore(trustStore); - } - if (trustStorePassword != null) { - factory.setTrustStorePassword(trustStorePassword); - } - + factory = contextFactory; } - factory.setNeedClientAuth(needClientAuth); - factory.setWantClientAuth(wantClientAuth); - - return sslConnector; + + if ("KRB".equals(auth) || "BOTH".equals(auth) + && Server.getVersion().startsWith("8")) { + return new Krb5AndCertsSslSocketConnector(factory, auth); + } else { + try { + Class cls = Class.forName("org.eclipse.jetty.server.ssl.SslSelectChannelConnector", true, Server.class.getClassLoader()); + return (Connector)cls.getConstructor(SslContextFactory.class).newInstance(factory); + } catch (Throwable t) { + Class c = Class.forName("org.eclipse.jetty.server.ServerConnector", true, Server.class.getClassLoader()); + Connector connector = (Connector)c.getConstructor(Server.class, SslContextFactory.class).newInstance(server, factory); + Server.class.getMethod("setStopTimeout", Long.TYPE).invoke(server, 500); + connector.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(connector, 500); + return connector; + } + } + } + private void setTrustStore(SslContextFactory factory, String trustStore2) throws Exception { + String mname = Server.getVersion().startsWith("8") ? "setTrustStore" : "setTrustStorePath"; + factory.getClass().getMethod(mname, String.class).invoke(factory, trustStore2); } + + // Properties // -------------------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java index 36b800b4ec..b982f18e6c 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java @@ -21,15 +21,26 @@ import java.util.Map; import org.apache.activemq.util.IntrospectionSupport; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.nio.SelectChannelConnector; public class SocketConnectorFactory { private Map transportOptions; public Connector createConnector(Server server) throws Exception { - SelectChannelConnector connector = new SelectChannelConnector(); - IntrospectionSupport.setProperties(connector, transportOptions, ""); + Connector connector = null; + + try { + connector = (Connector)Class.forName("org.eclipse.jetty.server.nio.SelectChannelConnector", true, Server.class.getClassLoader()).newInstance(); + } catch (Throwable t) { + Class c = Class.forName("org.eclipse.jetty.server.ServerConnector", true, Server.class.getClassLoader()); + connector = (Connector)c.getConstructor(Server.class).newInstance(server); + Server.class.getMethod("setStopTimeout", Long.TYPE).invoke(server, 500); + connector.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(connector, 500); + } + System.out.println(transportOptions); + if (transportOptions != null) { + IntrospectionSupport.setProperties(connector, transportOptions, ""); + } return connector; } diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java b/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java index 28c11a6419..a52424e623 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java @@ -35,6 +35,18 @@ abstract public class WebTransportServerSupport extends TransportServerSupport { super(location); } + private void setConnectorProperty(String name, Class type, T value) throws Exception { + connector.getClass().getMethod("set" + name, type).invoke(connector, value); + } + + protected void createServer() { + server = new Server(); + try { + server.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(server, 500l); + } catch (Throwable t) { + //ignore, jetty 8. + } + } public URI bind() throws Exception { URI bind = getBindLocation(); @@ -44,9 +56,11 @@ abstract public class WebTransportServerSupport extends TransportServerSupport { InetAddress addr = InetAddress.getByName(bindHost); host = addr.getCanonicalHostName(); - connector.setHost(host); - connector.setPort(bindAddress.getPort()); - connector.setServer(server); + setConnectorProperty("Host", String.class, host); + setConnectorProperty("Port", Integer.TYPE, bindAddress.getPort()); + if (Server.getVersion().startsWith("8")) { + connector.setServer(server); + } server.addConnector(connector); if (addr.isAnyLocalAddress()) { host = InetAddressUtil.getLocalHostName(); diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java index 96389ec7be..786e3aac8d 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java @@ -19,7 +19,6 @@ package org.apache.activemq.transport.discovery.http; import java.net.URI; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; @@ -27,13 +26,16 @@ public class EmbeddedJettyServer implements org.apache.activemq.Service { private HTTPDiscoveryAgent agent; private Server server; - private SelectChannelConnector connector; private DiscoveryRegistryServlet camelServlet = new DiscoveryRegistryServlet(); public void start() throws Exception { URI uri = new URI(agent.getRegistryURL()); - server = new Server(); + int port = 80; + if( uri.getPort() >=0 ) { + port = uri.getPort(); + } + server = new Server(port); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SECURITY | ServletContextHandler.NO_SESSIONS); context.setContextPath("/"); @@ -42,23 +44,9 @@ public class EmbeddedJettyServer implements org.apache.activemq.Service { context.addServlet(holder, "/*"); server.setHandler(context); server.start(); - - int port = 80; - if( uri.getPort() >=0 ) { - port = uri.getPort(); - } - - connector = new SelectChannelConnector(); - connector.setPort(port); - server.addConnector(connector); - connector.start(); } public void stop() throws Exception { - if( connector!=null ) { - connector.stop(); - connector = null; - } if( server!=null ) { server.stop(); server = null; diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java index 0c7ecd9cfe..8ae7874d9d 100755 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java @@ -27,8 +27,8 @@ import org.apache.activemq.transport.util.TextWireFormat; import org.apache.activemq.transport.xstream.XStreamWireFormat; import org.apache.activemq.util.ServiceStopper; import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.handler.GzipHandler; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.slf4j.Logger; @@ -77,7 +77,7 @@ public class HttpTransportServer extends WebTransportServerSupport { @Override protected void doStart() throws Exception { - server = new Server(); + createServer(); if (connector == null) { connector = socketConnectorFactory.createConnector(server); } @@ -96,8 +96,7 @@ public class HttpTransportServer extends WebTransportServerSupport { contextHandler.setAttribute("transportFactory", transportFactory); contextHandler.setAttribute("transportOptions", transportOptions); - GzipHandler gzipHandler = new GzipHandler(); - contextHandler.setHandler(gzipHandler); + addGzipHandler(contextHandler); server.start(); @@ -105,8 +104,9 @@ public class HttpTransportServer extends WebTransportServerSupport { // was set to zero so that we report the actual port we are listening on. int port = boundTo.getPort(); - if (connector.getLocalPort() != -1) { - port = connector.getLocalPort(); + int p2 = getConnectorLocalPort(); + if (p2 != -1) { + port = p2; } setConnectURI(new URI(boundTo.getScheme(), @@ -118,6 +118,19 @@ public class HttpTransportServer extends WebTransportServerSupport { boundTo.getFragment())); } + private int getConnectorLocalPort() throws Exception { + return (Integer)connector.getClass().getMethod("getLocalPort").invoke(connector); + } + private void addGzipHandler(ServletContextHandler contextHandler) throws Exception { + Handler handler = null; + try { + handler = (Handler)Class.forName("org.eclipse.jetty.server.handler.GzipHandler", true, Handler.class.getClassLoader()).newInstance(); + } catch (Throwable t) { + handler = (Handler)Class.forName("org.eclipse.jetty.servlets.gzip.GzipHandler", true, Handler.class.getClassLoader()).newInstance(); + } + contextHandler.setHandler(handler); + } + @Override protected void doStop(ServiceStopper stopper) throws Exception { Server temp = server; diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java b/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java index 858c9adb8f..cf3612294a 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java @@ -68,6 +68,14 @@ public class Krb5AndCertsSslSocketConnector extends SslSocketConnector { useCerts = true; setPasswords(); } + public Krb5AndCertsSslSocketConnector(SslContextFactory f, String auth) { + // By default, stick to cert based authentication + super(f); + useKrb = false; + useCerts = true; + setPasswords(); + setMode(auth); + } public static boolean isKrb(String mode) { return mode == MODE.KRB.toString() || mode == MODE.BOTH.toString(); diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java index 4b75c9a3de..e26027deef 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java @@ -49,7 +49,7 @@ public class WSTransportServer extends WebTransportServerSupport { @Override protected void doStart() throws Exception { - server = new Server(); + createServer(); if (connector == null) { connector = socketConnectorFactory.createConnector(server); @@ -69,7 +69,11 @@ public class WSTransportServer extends WebTransportServerSupport { } } - holder.setServlet(new WSServlet()); + if (Server.getVersion().startsWith("8")) { + holder.setServlet(new org.apache.activemq.transport.ws.jetty8.WSServlet()); + } else { + holder.setServlet(new org.apache.activemq.transport.ws.jetty9.WSServlet()); + } contextHandler.addServlet(holder, "/"); contextHandler.setAttribute("acceptListener", getAcceptListener()); @@ -79,9 +83,9 @@ public class WSTransportServer extends WebTransportServerSupport { // Update the Connect To URI with our actual location in case the configured port // was set to zero so that we report the actual port we are listening on. - int port = boundTo.getPort(); - if (connector.getLocalPort() != -1) { - port = connector.getLocalPort(); + int port = getConnectorLocalPort(); + if (port == -1) { + port = boundTo.getPort(); } setConnectURI(new URI(boundTo.getScheme(), @@ -95,6 +99,10 @@ public class WSTransportServer extends WebTransportServerSupport { LOG.info("Listening for connections at {}", getConnectURI()); } + private int getConnectorLocalPort() throws Exception { + return (Integer)connector.getClass().getMethod("getLocalPort").invoke(connector); + } + @Override protected void doStop(ServiceStopper stopper) throws Exception { Server temp = server; diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/MQTTSocket.java similarity index 99% rename from activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java rename to activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/MQTTSocket.java index 047c459ddb..58e9134f42 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/MQTTSocket.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.ws; +package org.apache.activemq.transport.ws.jetty8; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java similarity index 98% rename from activemq-http/src/main/java/org/apache/activemq/transport/ws/StompSocket.java rename to activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java index b0da09aea3..dba3ca98c9 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.transport.ws; +package org.apache.activemq.transport.ws.jetty8; import java.io.IOException; import java.util.concurrent.CountDownLatch; diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/WSServlet.java similarity index 97% rename from activemq-http/src/main/java/org/apache/activemq/transport/ws/WSServlet.java rename to activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/WSServlet.java index d0ed22da92..d0f7b19d93 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSServlet.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/WSServlet.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.activemq.transport.ws; +package org.apache.activemq.transport.ws.jetty8; import java.io.IOException; import javax.servlet.ServletException; diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java new file mode 100644 index 0000000000..4d7dac3a2e --- /dev/null +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.ws.jetty9; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.command.Command; +import org.apache.activemq.transport.TransportSupport; +import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor; +import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; +import org.apache.activemq.transport.mqtt.MQTTTransport; +import org.apache.activemq.transport.mqtt.MQTTWireFormat; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.fusesource.mqtt.codec.DISCONNECT; +import org.fusesource.mqtt.codec.MQTTFrame; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.cert.X509Certificate; +import java.util.concurrent.CountDownLatch; + +public class MQTTSocket extends TransportSupport implements WebSocketListener, MQTTTransport, BrokerServiceAware { + + private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class); + Session session; + MQTTProtocolConverter protocolConverter = null; + MQTTWireFormat wireFormat = new MQTTWireFormat(); + private final CountDownLatch socketTransportStarted = new CountDownLatch(1); + private BrokerService brokerService; + + private MQTTProtocolConverter getProtocolConverter() { + if( protocolConverter == null ) { + protocolConverter = new MQTTProtocolConverter(this, brokerService); + } + return protocolConverter; + } + + protected void doStart() throws Exception { + socketTransportStarted.countDown(); + } + + @Override + protected void doStop(ServiceStopper stopper) throws Exception { + } + + private boolean transportStartedAtLeastOnce() { + return socketTransportStarted.getCount() == 0; + } + + @Override + public int getReceiveCounter() { + return 0; + } + + @Override + public String getRemoteAddress() { + return "MQTTSocket_" + this.hashCode(); + } + + @Override + public void oneway(Object command) throws IOException { + try { + getProtocolConverter().onActiveMQCommand((Command) command); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } + } + + @Override + public void sendToActiveMQ(Command command) { + doConsume(command); + } + + @Override + public void sendToMQTT(MQTTFrame command) throws IOException { + ByteSequence bytes = wireFormat.marshal(command); + session.getRemote().sendBytes(ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength())); + } + + @Override + public X509Certificate[] getPeerCertificates() { + return new X509Certificate[0]; + } + + @Override + public MQTTInactivityMonitor getInactivityMonitor() { + return null; + } + + @Override + public MQTTWireFormat getWireFormat() { + return wireFormat; + } + + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } + + @Override + public void onWebSocketBinary(byte[] bytes, int offset, int length) { + if (!transportStartedAtLeastOnce()) { + LOG.debug("Waiting for StompSocket to be properly started..."); + try { + socketTransportStarted.await(); + } catch (InterruptedException e) { + LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions..."); + } + } + + try { + MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length)); + getProtocolConverter().onMQTTCommand(frame); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } + } + + @Override + public void onWebSocketClose(int arg0, String arg1) { + try { + getProtocolConverter().onMQTTCommand(new DISCONNECT().encode()); + } catch (Exception e) { + LOG.warn("Failed to close WebSocket", e); + } + } + + @Override + public void onWebSocketConnect(Session session) { + this.session = session; + } + + @Override + public void onWebSocketError(Throwable arg0) { + + } + + @Override + public void onWebSocketText(String arg0) { + } +} diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java new file mode 100644 index 0000000000..811f228faf --- /dev/null +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.ws.jetty9; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import org.apache.activemq.command.Command; +import org.apache.activemq.transport.TransportSupport; +import org.apache.activemq.transport.stomp.ProtocolConverter; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompFrame; +import org.apache.activemq.transport.stomp.StompInactivityMonitor; +import org.apache.activemq.transport.stomp.StompTransport; +import org.apache.activemq.transport.stomp.StompWireFormat; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements web socket and mediates between servlet and the broker + */ +class StompSocket extends TransportSupport implements WebSocketListener, StompTransport { + private static final Logger LOG = LoggerFactory.getLogger(StompSocket.class); + + Session session; + ProtocolConverter protocolConverter = new ProtocolConverter(this, null); + StompWireFormat wireFormat = new StompWireFormat(); + private final CountDownLatch socketTransportStarted = new CountDownLatch(1); + private StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this, wireFormat); + + private boolean transportStartedAtLeastOnce() { + return socketTransportStarted.getCount() == 0; + } + + @Override + protected void doStart() throws Exception { + socketTransportStarted.countDown(); + } + + @Override + protected void doStop(ServiceStopper stopper) throws Exception { + } + + @Override + public int getReceiveCounter() { + return 0; + } + + @Override + public String getRemoteAddress() { + return "StompSocket_" + this.hashCode(); + } + + @Override + public void oneway(Object command) throws IOException { + try { + protocolConverter.onActiveMQCommand((Command)command); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } + } + + @Override + public void sendToActiveMQ(Command command) { + doConsume(command); + } + + @Override + public void sendToStomp(StompFrame command) throws IOException { + session.getRemote().sendString(command.format()); + } + + @Override + public StompInactivityMonitor getInactivityMonitor() { + return stompInactivityMonitor; + } + + @Override + public StompWireFormat getWireFormat() { + return this.wireFormat; + } + + @Override + public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) { + } + + @Override + public void onWebSocketClose(int arg0, String arg1) { + try { + protocolConverter.onStompCommand(new StompFrame(Stomp.Commands.DISCONNECT)); + } catch (Exception e) { + LOG.warn("Failed to close WebSocket", e); + } + } + + @Override + public void onWebSocketConnect(Session session) { + this.session = session; + } + + @Override + public void onWebSocketError(Throwable arg0) { + } + + @Override + public void onWebSocketText(String data) { + if (!transportStartedAtLeastOnce()) { + LOG.debug("Waiting for StompSocket to be properly started..."); + try { + socketTransportStarted.await(); + } catch (InterruptedException e) { + LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions..."); + } + } + + try { + protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8")))); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } + } + +} diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java new file mode 100644 index 0000000000..15927b1a98 --- /dev/null +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.ws.jetty9; + +import java.io.IOException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.activemq.transport.TransportAcceptListener; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; + +/** + * Handle connection upgrade requests and creates web sockets + */ +public class WSServlet extends WebSocketServlet { + private static final long serialVersionUID = -4716657876092884139L; + + private TransportAcceptListener listener; + + public void init() throws ServletException { + super.init(); + listener = (TransportAcceptListener)getServletContext().getAttribute("acceptListener"); + if (listener == null) { + throw new ServletException("No such attribute 'acceptListener' available in the ServletContext"); + } + } + + protected void doGet(HttpServletRequest request, HttpServletResponse response) + throws ServletException ,IOException { + getServletContext().getNamedDispatcher("default").forward(request,response); + } + + + public void configure(WebSocketServletFactory factory) { + factory.setCreator(new WebSocketCreator() { + @Override + public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) { + WebSocketListener socket; + if (req.getSubProtocols().contains("mqtt")) { + socket = new MQTTSocket(); + } else { + socket = new StompSocket(); + } + return socket; + } + }); + + } +} + diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java index 92bc1cbb05..140356e846 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java @@ -32,16 +32,18 @@ import javax.net.ServerSocketFactory; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.spring.SpringSslContext; +import org.apache.activemq.transport.SocketConnectorFactory; import org.apache.activemq.transport.stomp.StompConnection; import org.apache.activemq.util.Wait; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.webapp.WebAppContext; + import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; + import org.openqa.selenium.By; import org.openqa.selenium.WebDriver; import org.openqa.selenium.WebElement; @@ -97,7 +99,9 @@ public class WSTransportTest { Server server = new Server(); Connector connector = createJettyConnector(server); - connector.setServer(server); + if (Server.getVersion().startsWith("8")) { + connector.setServer(server); + } WebAppContext context = new WebAppContext(); context.setResourceBase("src/test/webapp"); @@ -129,10 +133,10 @@ public class WSTransportTest { return proxyPort; } - protected Connector createJettyConnector(Server server) { - SelectChannelConnector connector = new SelectChannelConnector(); - connector.setPort(getProxyPort()); - return connector; + protected Connector createJettyConnector(Server server) throws Exception { + Connector c = new SocketConnectorFactory().createConnector(server); + c.getClass().getMethod("setPort", Integer.TYPE).invoke(c, getProxyPort()); + return c; } protected void stopBroker() throws Exception { diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java index ef6114092f..36b33f6319 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java @@ -16,23 +16,23 @@ */ package org.apache.activemq.transport.wss; +import org.apache.activemq.transport.SecureSocketConnectorFactory; import org.apache.activemq.transport.ws.WSTransportTest; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ssl.SslSocketConnector; -import org.eclipse.jetty.util.ssl.SslContextFactory; public class WSSTransportTest extends WSTransportTest { @Override - protected Connector createJettyConnector(Server server) { - SslSocketConnector sslConnector = new SslSocketConnector(); - SslContextFactory contextFactory = sslConnector.getSslContextFactory(); - contextFactory.setKeyStorePath("src/test/resources/server.keystore"); - contextFactory.setKeyStorePassword("password"); - contextFactory.setTrustStore("src/test/resources/client.keystore"); - contextFactory.setTrustStorePassword("password"); - sslConnector.setPort(getProxyPort()); - return sslConnector; + protected Connector createJettyConnector(Server server) throws Exception { + SecureSocketConnectorFactory sscf = new SecureSocketConnectorFactory(); + sscf.setKeyStore("src/test/resources/server.keystore"); + sscf.setKeyStorePassword("password"); + sscf.setTrustStore("src/test/resources/client.keystore"); + sscf.setTrustStorePassword("password"); + + Connector c = sscf.createConnector(server); + c.getClass().getMethod("setPort", Integer.TYPE).invoke(c, getProxyPort()); + return c; } @Override diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml index c56877a555..74be454f3e 100644 --- a/activemq-osgi/pom.xml +++ b/activemq-osgi/pom.xml @@ -52,7 +52,7 @@ com.fasterxml.jackson*;resolution:=optional, org.codehaus.jettison*;resolution:=optional, org.jasypt*;resolution:=optional, - org.eclipse.jetty*;resolution:=optional, + org.eclipse.jetty*;resolution:=optional;version="[8.1,10)", org.apache.zookeeper*;resolution:=optional, org.fusesource.leveldbjni*;resolution:=optional, org.fusesource.hawtjni*;resolution:=optional, diff --git a/activemq-web-console/pom.xml b/activemq-web-console/pom.xml index c14c40f2fb..8002444c26 100755 --- a/activemq-web-console/pom.xml +++ b/activemq-web-console/pom.xml @@ -32,6 +32,7 @@ 8080 + org.mortbay.jetty @@ -51,7 +52,7 @@ - org.mortbay.jetty + ${jetty.maven.groupid} jetty-maven-plugin ${jetty-version}