[AMQ-5517] Runtime support for Jetty 9. Build/compile with Jetty8, but tests pass with Jetty 9 for runtime level support.

This commit is contained in:
Daniel Kulp 2015-01-13 12:47:53 -05:00
parent bc2e2d9a41
commit 3f82625077
18 changed files with 562 additions and 110 deletions

View File

@ -108,6 +108,13 @@
<version>2.25.0</version> <version>2.25.0</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-server</artifactId>
<version>${jetty9-version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>

View File

@ -23,8 +23,6 @@ import org.apache.activemq.transport.https.Krb5AndCertsSslSocketConnector;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ssl.SslConnector;
import org.eclipse.jetty.server.ssl.SslSelectChannelConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
public class SecureSocketConnectorFactory extends SocketConnectorFactory { public class SecureSocketConnectorFactory extends SocketConnectorFactory {
@ -44,75 +42,100 @@ public class SecureSocketConnectorFactory extends SocketConnectorFactory {
private String auth; private String auth;
private SslContext context; private SslContext context;
private SslContextFactory contextFactory;
public SecureSocketConnectorFactory() {
}
public SecureSocketConnectorFactory(SslContext context) { public SecureSocketConnectorFactory(SslContext context) {
this.context = context; this.context = context;
} }
public SecureSocketConnectorFactory(SslContextFactory contextFactory) {
this.contextFactory = contextFactory;
}
@Override @Override
public Connector createConnector(Server server) throws Exception { public Connector createConnector(Server server) throws Exception {
IntrospectionSupport.setProperties(this, getTransportOptions()); if (getTransportOptions() != null) {
SslConnector sslConnector; IntrospectionSupport.setProperties(this, getTransportOptions());
if (Krb5AndCertsSslSocketConnector.isKrb(auth)) {
sslConnector = new Krb5AndCertsSslSocketConnector();
((Krb5AndCertsSslSocketConnector)sslConnector).setMode(auth);
} else {
sslConnector = new SslSelectChannelConnector();
} }
SSLContext sslContext = context == null ? null : context.getSSLContext(); SSLContext sslContext = context == null ? null : context.getSSLContext();
// Get a reference to the current ssl context factory... // Get a reference to the current ssl context factory...
SslContextFactory factory = sslConnector.getSslContextFactory();
if (context != null) { SslContextFactory factory;
if (contextFactory == null) {
// Should not be using this method since it does not use all of the values factory = new SslContextFactory();
// from the passed SslContext instance..... if (context != null) {
factory.setSslContext(sslContext); // Should not be using this method since it does not use all of the values
// from the passed SslContext instance.....
factory.setSslContext(sslContext);
} else {
if (keyStore != null) {
factory.setKeyStorePath(keyStore);
}
if (keyStorePassword != null) {
factory.setKeyStorePassword(keyStorePassword);
}
// if the keyPassword hasn't been set, default it to the
// key store password
if (keyPassword == null && keyStorePassword != null) {
factory.setKeyStorePassword(keyStorePassword);
}
if (keyStoreType != null) {
factory.setKeyStoreType(keyStoreType);
}
if (secureRandomCertficateAlgorithm != null) {
factory.setSecureRandomAlgorithm(secureRandomCertficateAlgorithm);
}
if (keyCertificateAlgorithm != null) {
factory.setSslKeyManagerFactoryAlgorithm(keyCertificateAlgorithm);
}
if (trustCertificateAlgorithm != null) {
factory.setTrustManagerFactoryAlgorithm(trustCertificateAlgorithm);
}
if (protocol != null) {
factory.setProtocol(protocol);
}
if (trustStore != null) {
setTrustStore(factory, trustStore);
}
if (trustStorePassword != null) {
factory.setTrustStorePassword(trustStorePassword);
}
}
factory.setNeedClientAuth(needClientAuth);
factory.setWantClientAuth(wantClientAuth);
} else { } else {
factory = contextFactory;
if (keyStore != null) {
factory.setKeyStorePath(keyStore);
}
if (keyStorePassword != null) {
factory.setKeyStorePassword(keyStorePassword);
}
// if the keyPassword hasn't been set, default it to the
// key store password
if (keyPassword == null && keyStorePassword != null) {
factory.setKeyStorePassword(keyStorePassword);
}
if (keyStoreType != null) {
factory.setKeyStoreType(keyStoreType);
}
if (secureRandomCertficateAlgorithm != null) {
factory.setSecureRandomAlgorithm(secureRandomCertficateAlgorithm);
}
if (keyCertificateAlgorithm != null) {
factory.setSslKeyManagerFactoryAlgorithm(keyCertificateAlgorithm);
}
if (trustCertificateAlgorithm != null) {
factory.setTrustManagerFactoryAlgorithm(trustCertificateAlgorithm);
}
if (protocol != null) {
factory.setProtocol(protocol);
}
if (trustStore != null) {
factory.setTrustStore(trustStore);
}
if (trustStorePassword != null) {
factory.setTrustStorePassword(trustStorePassword);
}
} }
factory.setNeedClientAuth(needClientAuth);
factory.setWantClientAuth(wantClientAuth);
return sslConnector; if ("KRB".equals(auth) || "BOTH".equals(auth)
&& Server.getVersion().startsWith("8")) {
return new Krb5AndCertsSslSocketConnector(factory, auth);
} else {
try {
Class<?> cls = Class.forName("org.eclipse.jetty.server.ssl.SslSelectChannelConnector", true, Server.class.getClassLoader());
return (Connector)cls.getConstructor(SslContextFactory.class).newInstance(factory);
} catch (Throwable t) {
Class<?> c = Class.forName("org.eclipse.jetty.server.ServerConnector", true, Server.class.getClassLoader());
Connector connector = (Connector)c.getConstructor(Server.class, SslContextFactory.class).newInstance(server, factory);
Server.class.getMethod("setStopTimeout", Long.TYPE).invoke(server, 500);
connector.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(connector, 500);
return connector;
}
}
} }
private void setTrustStore(SslContextFactory factory, String trustStore2) throws Exception {
String mname = Server.getVersion().startsWith("8") ? "setTrustStore" : "setTrustStorePath";
factory.getClass().getMethod(mname, String.class).invoke(factory, trustStore2);
}
// Properties // Properties
// -------------------------------------------------------------------------------- // --------------------------------------------------------------------------------

View File

@ -21,15 +21,26 @@ import java.util.Map;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
public class SocketConnectorFactory { public class SocketConnectorFactory {
private Map<String, Object> transportOptions; private Map<String, Object> transportOptions;
public Connector createConnector(Server server) throws Exception { public Connector createConnector(Server server) throws Exception {
SelectChannelConnector connector = new SelectChannelConnector(); Connector connector = null;
IntrospectionSupport.setProperties(connector, transportOptions, "");
try {
connector = (Connector)Class.forName("org.eclipse.jetty.server.nio.SelectChannelConnector", true, Server.class.getClassLoader()).newInstance();
} catch (Throwable t) {
Class<?> c = Class.forName("org.eclipse.jetty.server.ServerConnector", true, Server.class.getClassLoader());
connector = (Connector)c.getConstructor(Server.class).newInstance(server);
Server.class.getMethod("setStopTimeout", Long.TYPE).invoke(server, 500);
connector.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(connector, 500);
}
System.out.println(transportOptions);
if (transportOptions != null) {
IntrospectionSupport.setProperties(connector, transportOptions, "");
}
return connector; return connector;
} }

View File

@ -35,6 +35,18 @@ abstract public class WebTransportServerSupport extends TransportServerSupport {
super(location); super(location);
} }
private <T> void setConnectorProperty(String name, Class<T> type, T value) throws Exception {
connector.getClass().getMethod("set" + name, type).invoke(connector, value);
}
protected void createServer() {
server = new Server();
try {
server.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(server, 500l);
} catch (Throwable t) {
//ignore, jetty 8.
}
}
public URI bind() throws Exception { public URI bind() throws Exception {
URI bind = getBindLocation(); URI bind = getBindLocation();
@ -44,9 +56,11 @@ abstract public class WebTransportServerSupport extends TransportServerSupport {
InetAddress addr = InetAddress.getByName(bindHost); InetAddress addr = InetAddress.getByName(bindHost);
host = addr.getCanonicalHostName(); host = addr.getCanonicalHostName();
connector.setHost(host); setConnectorProperty("Host", String.class, host);
connector.setPort(bindAddress.getPort()); setConnectorProperty("Port", Integer.TYPE, bindAddress.getPort());
connector.setServer(server); if (Server.getVersion().startsWith("8")) {
connector.setServer(server);
}
server.addConnector(connector); server.addConnector(connector);
if (addr.isAnyLocalAddress()) { if (addr.isAnyLocalAddress()) {
host = InetAddressUtil.getLocalHostName(); host = InetAddressUtil.getLocalHostName();

View File

@ -19,7 +19,6 @@ package org.apache.activemq.transport.discovery.http;
import java.net.URI; import java.net.URI;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlet.ServletHolder;
@ -27,13 +26,16 @@ public class EmbeddedJettyServer implements org.apache.activemq.Service {
private HTTPDiscoveryAgent agent; private HTTPDiscoveryAgent agent;
private Server server; private Server server;
private SelectChannelConnector connector;
private DiscoveryRegistryServlet camelServlet = new DiscoveryRegistryServlet(); private DiscoveryRegistryServlet camelServlet = new DiscoveryRegistryServlet();
public void start() throws Exception { public void start() throws Exception {
URI uri = new URI(agent.getRegistryURL()); URI uri = new URI(agent.getRegistryURL());
server = new Server(); int port = 80;
if( uri.getPort() >=0 ) {
port = uri.getPort();
}
server = new Server(port);
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SECURITY | ServletContextHandler.NO_SESSIONS); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SECURITY | ServletContextHandler.NO_SESSIONS);
context.setContextPath("/"); context.setContextPath("/");
@ -42,23 +44,9 @@ public class EmbeddedJettyServer implements org.apache.activemq.Service {
context.addServlet(holder, "/*"); context.addServlet(holder, "/*");
server.setHandler(context); server.setHandler(context);
server.start(); server.start();
int port = 80;
if( uri.getPort() >=0 ) {
port = uri.getPort();
}
connector = new SelectChannelConnector();
connector.setPort(port);
server.addConnector(connector);
connector.start();
} }
public void stop() throws Exception { public void stop() throws Exception {
if( connector!=null ) {
connector.stop();
connector = null;
}
if( server!=null ) { if( server!=null ) {
server.stop(); server.stop();
server = null; server = null;

View File

@ -27,8 +27,8 @@ import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.transport.xstream.XStreamWireFormat; import org.apache.activemq.transport.xstream.XStreamWireFormat;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.GzipHandler;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -77,7 +77,7 @@ public class HttpTransportServer extends WebTransportServerSupport {
@Override @Override
protected void doStart() throws Exception { protected void doStart() throws Exception {
server = new Server(); createServer();
if (connector == null) { if (connector == null) {
connector = socketConnectorFactory.createConnector(server); connector = socketConnectorFactory.createConnector(server);
} }
@ -96,8 +96,7 @@ public class HttpTransportServer extends WebTransportServerSupport {
contextHandler.setAttribute("transportFactory", transportFactory); contextHandler.setAttribute("transportFactory", transportFactory);
contextHandler.setAttribute("transportOptions", transportOptions); contextHandler.setAttribute("transportOptions", transportOptions);
GzipHandler gzipHandler = new GzipHandler(); addGzipHandler(contextHandler);
contextHandler.setHandler(gzipHandler);
server.start(); server.start();
@ -105,8 +104,9 @@ public class HttpTransportServer extends WebTransportServerSupport {
// was set to zero so that we report the actual port we are listening on. // was set to zero so that we report the actual port we are listening on.
int port = boundTo.getPort(); int port = boundTo.getPort();
if (connector.getLocalPort() != -1) { int p2 = getConnectorLocalPort();
port = connector.getLocalPort(); if (p2 != -1) {
port = p2;
} }
setConnectURI(new URI(boundTo.getScheme(), setConnectURI(new URI(boundTo.getScheme(),
@ -118,6 +118,19 @@ public class HttpTransportServer extends WebTransportServerSupport {
boundTo.getFragment())); boundTo.getFragment()));
} }
private int getConnectorLocalPort() throws Exception {
return (Integer)connector.getClass().getMethod("getLocalPort").invoke(connector);
}
private void addGzipHandler(ServletContextHandler contextHandler) throws Exception {
Handler handler = null;
try {
handler = (Handler)Class.forName("org.eclipse.jetty.server.handler.GzipHandler", true, Handler.class.getClassLoader()).newInstance();
} catch (Throwable t) {
handler = (Handler)Class.forName("org.eclipse.jetty.servlets.gzip.GzipHandler", true, Handler.class.getClassLoader()).newInstance();
}
contextHandler.setHandler(handler);
}
@Override @Override
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
Server temp = server; Server temp = server;

View File

@ -68,6 +68,14 @@ public class Krb5AndCertsSslSocketConnector extends SslSocketConnector {
useCerts = true; useCerts = true;
setPasswords(); setPasswords();
} }
public Krb5AndCertsSslSocketConnector(SslContextFactory f, String auth) {
// By default, stick to cert based authentication
super(f);
useKrb = false;
useCerts = true;
setPasswords();
setMode(auth);
}
public static boolean isKrb(String mode) { public static boolean isKrb(String mode) {
return mode == MODE.KRB.toString() || mode == MODE.BOTH.toString(); return mode == MODE.KRB.toString() || mode == MODE.BOTH.toString();

View File

@ -49,7 +49,7 @@ public class WSTransportServer extends WebTransportServerSupport {
@Override @Override
protected void doStart() throws Exception { protected void doStart() throws Exception {
server = new Server(); createServer();
if (connector == null) { if (connector == null) {
connector = socketConnectorFactory.createConnector(server); connector = socketConnectorFactory.createConnector(server);
@ -69,7 +69,11 @@ public class WSTransportServer extends WebTransportServerSupport {
} }
} }
holder.setServlet(new WSServlet()); if (Server.getVersion().startsWith("8")) {
holder.setServlet(new org.apache.activemq.transport.ws.jetty8.WSServlet());
} else {
holder.setServlet(new org.apache.activemq.transport.ws.jetty9.WSServlet());
}
contextHandler.addServlet(holder, "/"); contextHandler.addServlet(holder, "/");
contextHandler.setAttribute("acceptListener", getAcceptListener()); contextHandler.setAttribute("acceptListener", getAcceptListener());
@ -79,9 +83,9 @@ public class WSTransportServer extends WebTransportServerSupport {
// Update the Connect To URI with our actual location in case the configured port // Update the Connect To URI with our actual location in case the configured port
// was set to zero so that we report the actual port we are listening on. // was set to zero so that we report the actual port we are listening on.
int port = boundTo.getPort(); int port = getConnectorLocalPort();
if (connector.getLocalPort() != -1) { if (port == -1) {
port = connector.getLocalPort(); port = boundTo.getPort();
} }
setConnectURI(new URI(boundTo.getScheme(), setConnectURI(new URI(boundTo.getScheme(),
@ -95,6 +99,10 @@ public class WSTransportServer extends WebTransportServerSupport {
LOG.info("Listening for connections at {}", getConnectURI()); LOG.info("Listening for connections at {}", getConnectURI());
} }
private int getConnectorLocalPort() throws Exception {
return (Integer)connector.getClass().getMethod("getLocalPort").invoke(connector);
}
@Override @Override
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
Server temp = server; Server temp = server;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.transport.ws; package org.apache.activemq.transport.ws.jetty8;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerServiceAware;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.transport.ws; package org.apache.activemq.transport.ws.jetty8;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;

View File

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.transport.ws; package org.apache.activemq.transport.ws.jetty8;
import java.io.IOException; import java.io.IOException;
import javax.servlet.ServletException; import javax.servlet.ServletException;

View File

@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.ws.jetty9;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.TransportSupport;
import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor;
import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
import org.apache.activemq.transport.mqtt.MQTTTransport;
import org.apache.activemq.transport.mqtt.MQTTWireFormat;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.fusesource.mqtt.codec.DISCONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.cert.X509Certificate;
import java.util.concurrent.CountDownLatch;
public class MQTTSocket extends TransportSupport implements WebSocketListener, MQTTTransport, BrokerServiceAware {
private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
Session session;
MQTTProtocolConverter protocolConverter = null;
MQTTWireFormat wireFormat = new MQTTWireFormat();
private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
private BrokerService brokerService;
private MQTTProtocolConverter getProtocolConverter() {
if( protocolConverter == null ) {
protocolConverter = new MQTTProtocolConverter(this, brokerService);
}
return protocolConverter;
}
protected void doStart() throws Exception {
socketTransportStarted.countDown();
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
}
private boolean transportStartedAtLeastOnce() {
return socketTransportStarted.getCount() == 0;
}
@Override
public int getReceiveCounter() {
return 0;
}
@Override
public String getRemoteAddress() {
return "MQTTSocket_" + this.hashCode();
}
@Override
public void oneway(Object command) throws IOException {
try {
getProtocolConverter().onActiveMQCommand((Command) command);
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
}
}
@Override
public void sendToActiveMQ(Command command) {
doConsume(command);
}
@Override
public void sendToMQTT(MQTTFrame command) throws IOException {
ByteSequence bytes = wireFormat.marshal(command);
session.getRemote().sendBytes(ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength()));
}
@Override
public X509Certificate[] getPeerCertificates() {
return new X509Certificate[0];
}
@Override
public MQTTInactivityMonitor getInactivityMonitor() {
return null;
}
@Override
public MQTTWireFormat getWireFormat() {
return wireFormat;
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
@Override
public void onWebSocketBinary(byte[] bytes, int offset, int length) {
if (!transportStartedAtLeastOnce()) {
LOG.debug("Waiting for StompSocket to be properly started...");
try {
socketTransportStarted.await();
} catch (InterruptedException e) {
LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
}
}
try {
MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
getProtocolConverter().onMQTTCommand(frame);
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
}
}
@Override
public void onWebSocketClose(int arg0, String arg1) {
try {
getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
} catch (Exception e) {
LOG.warn("Failed to close WebSocket", e);
}
}
@Override
public void onWebSocketConnect(Session session) {
this.session = session;
}
@Override
public void onWebSocketError(Throwable arg0) {
}
@Override
public void onWebSocketText(String arg0) {
}
}

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.ws.jetty9;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.TransportSupport;
import org.apache.activemq.transport.stomp.ProtocolConverter;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompFrame;
import org.apache.activemq.transport.stomp.StompInactivityMonitor;
import org.apache.activemq.transport.stomp.StompTransport;
import org.apache.activemq.transport.stomp.StompWireFormat;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements web socket and mediates between servlet and the broker
*/
class StompSocket extends TransportSupport implements WebSocketListener, StompTransport {
private static final Logger LOG = LoggerFactory.getLogger(StompSocket.class);
Session session;
ProtocolConverter protocolConverter = new ProtocolConverter(this, null);
StompWireFormat wireFormat = new StompWireFormat();
private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
private StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this, wireFormat);
private boolean transportStartedAtLeastOnce() {
return socketTransportStarted.getCount() == 0;
}
@Override
protected void doStart() throws Exception {
socketTransportStarted.countDown();
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
}
@Override
public int getReceiveCounter() {
return 0;
}
@Override
public String getRemoteAddress() {
return "StompSocket_" + this.hashCode();
}
@Override
public void oneway(Object command) throws IOException {
try {
protocolConverter.onActiveMQCommand((Command)command);
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
}
}
@Override
public void sendToActiveMQ(Command command) {
doConsume(command);
}
@Override
public void sendToStomp(StompFrame command) throws IOException {
session.getRemote().sendString(command.format());
}
@Override
public StompInactivityMonitor getInactivityMonitor() {
return stompInactivityMonitor;
}
@Override
public StompWireFormat getWireFormat() {
return this.wireFormat;
}
@Override
public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) {
}
@Override
public void onWebSocketClose(int arg0, String arg1) {
try {
protocolConverter.onStompCommand(new StompFrame(Stomp.Commands.DISCONNECT));
} catch (Exception e) {
LOG.warn("Failed to close WebSocket", e);
}
}
@Override
public void onWebSocketConnect(Session session) {
this.session = session;
}
@Override
public void onWebSocketError(Throwable arg0) {
}
@Override
public void onWebSocketText(String data) {
if (!transportStartedAtLeastOnce()) {
LOG.debug("Waiting for StompSocket to be properly started...");
try {
socketTransportStarted.await();
} catch (InterruptedException e) {
LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
}
}
try {
protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8"))));
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
}
}
}

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.ws.jetty9;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.activemq.transport.TransportAcceptListener;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
/**
* Handle connection upgrade requests and creates web sockets
*/
public class WSServlet extends WebSocketServlet {
private static final long serialVersionUID = -4716657876092884139L;
private TransportAcceptListener listener;
public void init() throws ServletException {
super.init();
listener = (TransportAcceptListener)getServletContext().getAttribute("acceptListener");
if (listener == null) {
throw new ServletException("No such attribute 'acceptListener' available in the ServletContext");
}
}
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException ,IOException {
getServletContext().getNamedDispatcher("default").forward(request,response);
}
public void configure(WebSocketServletFactory factory) {
factory.setCreator(new WebSocketCreator() {
@Override
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
WebSocketListener socket;
if (req.getSubProtocols().contains("mqtt")) {
socket = new MQTTSocket();
} else {
socket = new StompSocket();
}
return socket;
}
});
}
}

View File

@ -32,16 +32,18 @@ import javax.net.ServerSocketFactory;
import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.spring.SpringSslContext; import org.apache.activemq.spring.SpringSslContext;
import org.apache.activemq.transport.SocketConnectorFactory;
import org.apache.activemq.transport.stomp.StompConnection; import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.webapp.WebAppContext; import org.eclipse.jetty.webapp.WebAppContext;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.openqa.selenium.By; import org.openqa.selenium.By;
import org.openqa.selenium.WebDriver; import org.openqa.selenium.WebDriver;
import org.openqa.selenium.WebElement; import org.openqa.selenium.WebElement;
@ -97,7 +99,9 @@ public class WSTransportTest {
Server server = new Server(); Server server = new Server();
Connector connector = createJettyConnector(server); Connector connector = createJettyConnector(server);
connector.setServer(server); if (Server.getVersion().startsWith("8")) {
connector.setServer(server);
}
WebAppContext context = new WebAppContext(); WebAppContext context = new WebAppContext();
context.setResourceBase("src/test/webapp"); context.setResourceBase("src/test/webapp");
@ -129,10 +133,10 @@ public class WSTransportTest {
return proxyPort; return proxyPort;
} }
protected Connector createJettyConnector(Server server) { protected Connector createJettyConnector(Server server) throws Exception {
SelectChannelConnector connector = new SelectChannelConnector(); Connector c = new SocketConnectorFactory().createConnector(server);
connector.setPort(getProxyPort()); c.getClass().getMethod("setPort", Integer.TYPE).invoke(c, getProxyPort());
return connector; return c;
} }
protected void stopBroker() throws Exception { protected void stopBroker() throws Exception {

View File

@ -16,23 +16,23 @@
*/ */
package org.apache.activemq.transport.wss; package org.apache.activemq.transport.wss;
import org.apache.activemq.transport.SecureSocketConnectorFactory;
import org.apache.activemq.transport.ws.WSTransportTest; import org.apache.activemq.transport.ws.WSTransportTest;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ssl.SslSocketConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class WSSTransportTest extends WSTransportTest { public class WSSTransportTest extends WSTransportTest {
@Override @Override
protected Connector createJettyConnector(Server server) { protected Connector createJettyConnector(Server server) throws Exception {
SslSocketConnector sslConnector = new SslSocketConnector(); SecureSocketConnectorFactory sscf = new SecureSocketConnectorFactory();
SslContextFactory contextFactory = sslConnector.getSslContextFactory(); sscf.setKeyStore("src/test/resources/server.keystore");
contextFactory.setKeyStorePath("src/test/resources/server.keystore"); sscf.setKeyStorePassword("password");
contextFactory.setKeyStorePassword("password"); sscf.setTrustStore("src/test/resources/client.keystore");
contextFactory.setTrustStore("src/test/resources/client.keystore"); sscf.setTrustStorePassword("password");
contextFactory.setTrustStorePassword("password");
sslConnector.setPort(getProxyPort()); Connector c = sscf.createConnector(server);
return sslConnector; c.getClass().getMethod("setPort", Integer.TYPE).invoke(c, getProxyPort());
return c;
} }
@Override @Override

View File

@ -52,7 +52,7 @@
com.fasterxml.jackson*;resolution:=optional, com.fasterxml.jackson*;resolution:=optional,
org.codehaus.jettison*;resolution:=optional, org.codehaus.jettison*;resolution:=optional,
org.jasypt*;resolution:=optional, org.jasypt*;resolution:=optional,
org.eclipse.jetty*;resolution:=optional, org.eclipse.jetty*;resolution:=optional;version="[8.1,10)",
org.apache.zookeeper*;resolution:=optional, org.apache.zookeeper*;resolution:=optional,
org.fusesource.leveldbjni*;resolution:=optional, org.fusesource.leveldbjni*;resolution:=optional,
org.fusesource.hawtjni*;resolution:=optional, org.fusesource.hawtjni*;resolution:=optional,

View File

@ -32,6 +32,7 @@
<properties> <properties>
<jetty.port>8080</jetty.port> <jetty.port>8080</jetty.port>
<jetty.maven.groupid>org.mortbay.jetty</jetty.maven.groupid>
</properties> </properties>
<build> <build>
@ -51,7 +52,7 @@
</configuration> </configuration>
</plugin> </plugin>
<plugin> <plugin>
<groupId>org.mortbay.jetty</groupId> <groupId>${jetty.maven.groupid}</groupId>
<artifactId>jetty-maven-plugin</artifactId> <artifactId>jetty-maven-plugin</artifactId>
<version>${jetty-version}</version> <version>${jetty-version}</version>
<configuration> <configuration>