diff --git a/activemq-optional/pom.xml b/activemq-optional/pom.xml index 3a0333dc8b..a4f3b9559b 100755 --- a/activemq-optional/pom.xml +++ b/activemq-optional/pom.xml @@ -94,6 +94,11 @@ jetty-webapp ${jetty-version} + + org.eclipse.jetty + jetty-websocket + ${jetty-version} + axis diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompServlet.java b/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompServlet.java new file mode 100644 index 0000000000..267803c5b4 --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompServlet.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.ws; + +import java.io.IOException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.activemq.transport.TransportAcceptListener; +import org.eclipse.jetty.websocket.WebSocket; +import org.eclipse.jetty.websocket.WebSocketServlet; + +/** + * + * Handle connection upgrade requests and creates web sockets + * + */ +public class StompServlet extends WebSocketServlet { + + private static final long serialVersionUID = -4716657876092884139L; + + private TransportAcceptListener listener; + + public void init() throws ServletException { + super.init(); + listener = (TransportAcceptListener)getServletContext().getAttribute("acceptListener"); + if (listener == null) { + throw new ServletException("No such attribute 'acceptListener' available in the ServletContext"); + } + } + + protected void doGet(HttpServletRequest request, HttpServletResponse response) + throws ServletException ,IOException { + getServletContext().getNamedDispatcher("default").forward(request,response); + } + + protected WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) { + StompSocket socket = new StompSocket(); + listener.onAccept(socket); + return socket; + } + +} diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java b/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java new file mode 100644 index 0000000000..bcaf331a07 --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.ws; + +import java.io.IOException; +import java.security.cert.X509Certificate; + +import org.apache.activemq.command.Command; +import org.apache.activemq.transport.TransportSupport; +import org.apache.activemq.transport.stomp.LegacyFrameTranslator; +import org.apache.activemq.transport.stomp.ProtocolConverter; +import org.apache.activemq.transport.stomp.StompFrame; +import org.apache.activemq.transport.stomp.StompTransport; +import org.apache.activemq.transport.stomp.StompWireFormat; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; +import org.eclipse.jetty.websocket.WebSocket; + +/** + * + * Implements web socket and mediates between servlet and the broker + * + */ +class StompSocket extends TransportSupport implements WebSocket, StompTransport { + Outbound outbound; + ProtocolConverter protocolConverter = new ProtocolConverter(this, new LegacyFrameTranslator(), null); + StompWireFormat wireFormat = new StompWireFormat(); + + public void onConnect(Outbound outbound) { + this.outbound=outbound; + } + + public void onMessage(byte frame, byte[] data,int offset, int length) {} + + public void onMessage(byte frame, String data) { + try { + protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes()))); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } + } + + public void onDisconnect() { + } + + protected void doStart() throws Exception { + } + + protected void doStop(ServiceStopper stopper) throws Exception { + } + + public int getReceiveCounter() { + return 0; + } + + public String getRemoteAddress() { + return "StompSocket_" + this.hashCode(); + } + + public void oneway(Object command) throws IOException { + try { + protocolConverter.onActiveMQCommand((Command)command); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } + } + + public X509Certificate[] getPeerCertificates() { + return null; + } + + public void sendToActiveMQ(Command command) { + doConsume(command); + } + + public void sendToStomp(StompFrame command) throws IOException { + outbound.sendMessage(WebSocket.SENTINEL_FRAME, command.toString()); + } +} diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java b/activemq-optional/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java new file mode 100644 index 0000000000..906fbd6dcf --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.ws; + +import java.io.IOException; +import java.net.URI; + +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportServer; + +/** + * + * Factory for WebSocket (ws) transport + * + */ +public class WSTransportFactory extends TransportFactory { + + public TransportServer doBind(URI location) throws IOException { + return new WSTransportServer(location); + } + +} diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java b/activemq-optional/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java new file mode 100644 index 0000000000..dfd78cae03 --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.ws; + +import java.net.InetSocketAddress; +import java.net.URI; + +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.transport.TransportServerSupport; +import org.apache.activemq.util.ServiceStopper; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.bio.SocketConnector; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.server.session.SessionHandler; +import org.eclipse.jetty.servlet.ServletHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.servlet.ServletMapping; + +/** + * Creates a web server and registers web socket server + * + */ +public class WSTransportServer extends TransportServerSupport { + + private URI bindAddress; + private Server server; + private Connector connector; + + public WSTransportServer(URI location) { + super(location); + this.bindAddress = location; + } + + protected void doStart() throws Exception { + server = new Server(); + if (connector == null) { + connector = new SocketConnector(); + } + connector.setHost(bindAddress.getHost()); + connector.setPort(bindAddress.getPort()); + server.setConnectors(new Connector[] { + connector + }); + + ContextHandler contextHandler = new ContextHandler(); + contextHandler.setContextPath("/"); + contextHandler.setServer(server); + server.setHandler(contextHandler); + + SessionHandler sessionHandler = new SessionHandler(); + contextHandler.setHandler(sessionHandler); + + ServletHandler servletHandler = new ServletHandler(); + sessionHandler.setHandler(servletHandler); + + ServletHolder holder = new ServletHolder(); + holder.setName("WSStomp"); + holder.setClassName(StompServlet.class.getName()); + servletHandler.setServlets(new ServletHolder[] { + holder + }); + + ServletMapping mapping = new ServletMapping(); + mapping.setServletName("WSStomp"); + mapping.setPathSpec("/*"); + servletHandler.setServletMappings(new ServletMapping[] { + mapping + }); + + contextHandler.setAttribute("acceptListener", getAcceptListener()); + + server.start(); + } + + protected void doStop(ServiceStopper stopper) throws Exception { + Server temp = server; + server = null; + if (temp != null) { + temp.stop(); + } + } + + public InetSocketAddress getSocketAddress() { + return null; + } + + public void setBrokerInfo(BrokerInfo brokerInfo) { + } + +} diff --git a/activemq-optional/src/main/resources/META-INF/services/org/apache/activemq/transport/ws b/activemq-optional/src/main/resources/META-INF/services/org/apache/activemq/transport/ws new file mode 100644 index 0000000000..3783dfc90e --- /dev/null +++ b/activemq-optional/src/main/resources/META-INF/services/org/apache/activemq/transport/ws @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.ws.WSTransportFactory \ No newline at end of file diff --git a/activemq-optional/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java b/activemq-optional/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java new file mode 100644 index 0000000000..dac379d8f7 --- /dev/null +++ b/activemq-optional/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.ws; + +import java.net.URI; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.junit.Test; + + +public class WSTransportTest { + + protected String getBindLocation() { + return "ws://localhost:61614"; + } + + @Test + public void testBrokerStart() throws Exception { + BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false&useJmx=false")); + broker.addConnector(getBindLocation()); + broker.start(); + broker.waitUntilStarted(); + Thread.sleep(2000); + //System.in.read(); + broker.stop(); + broker.waitUntilStopped(); + } + +}