git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1499777 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2013-07-04 14:54:29 +00:00
parent 2019a21d96
commit 33ec1cf99b
10 changed files with 2407 additions and 6 deletions

View File

@ -39,6 +39,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>activemq-stomp</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.ws;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.TransportSupport;
import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor;
import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
import org.apache.activemq.transport.mqtt.MQTTTransport;
import org.apache.activemq.transport.mqtt.MQTTWireFormat;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.eclipse.jetty.websocket.WebSocket;
import org.fusesource.mqtt.codec.DISCONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.concurrent.CountDownLatch;
public class MQTTSocket extends TransportSupport implements WebSocket.OnBinaryMessage, MQTTTransport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
Connection outbound;
MQTTProtocolConverter protocolConverter = new MQTTProtocolConverter(this, null);
MQTTWireFormat wireFormat = new MQTTWireFormat();
private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
@Override
public void onMessage(byte[] bytes, int offset, int length) {
if (!transportStartedAtLeastOnce()) {
LOG.debug("Waiting for StompSocket to be properly started...");
try {
socketTransportStarted.await();
} catch (InterruptedException e) {
LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
}
}
try {
MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
protocolConverter.onMQTTCommand(frame);
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
}
}
@Override
public void onOpen(Connection connection) {
this.outbound = connection;
}
@Override
public void onClose(int closeCode, String message) {
try {
protocolConverter.onMQTTCommand(new DISCONNECT().encode());
} catch (Exception e) {
LOG.warn("Failed to close WebSocket", e);
}
}
protected void doStart() throws Exception {
socketTransportStarted.countDown();
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
}
private boolean transportStartedAtLeastOnce() {
return socketTransportStarted.getCount() == 0;
}
@Override
public int getReceiveCounter() {
return 0;
}
@Override
public String getRemoteAddress() {
return "MQTTSocket_" + this.hashCode();
}
@Override
public void oneway(Object command) throws IOException {
try {
protocolConverter.onActiveMQCommand((Command)command);
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
}
}
@Override
public void sendToActiveMQ(Command command) {
doConsume(command);
}
@Override
public void sendToMQTT(MQTTFrame command) throws IOException {
ByteSequence bytes = wireFormat.marshal(command);
outbound.sendMessage(bytes.getData(), 0, bytes.getLength());
}
@Override
public X509Certificate[] getPeerCertificates() {
return new X509Certificate[0];
}
@Override
public MQTTInactivityMonitor getInactivityMonitor() {
return null;
}
@Override
public MQTTWireFormat getWireFormat() {
return wireFormat;
}
}

View File

@ -22,6 +22,7 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketServlet;
@ -29,7 +30,7 @@ import org.eclipse.jetty.websocket.WebSocketServlet;
/**
* Handle connection upgrade requests and creates web sockets
*/
public class StompServlet extends WebSocketServlet {
public class WSServlet extends WebSocketServlet {
private static final long serialVersionUID = -4716657876092884139L;
private TransportAcceptListener listener;
@ -49,8 +50,13 @@ public class StompServlet extends WebSocketServlet {
@Override
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
StompSocket socket = new StompSocket();
listener.onAccept(socket);
WebSocket socket;
if (protocol.startsWith("mqtt")) {
socket = new MQTTSocket();
} else {
socket = new StompSocket();
}
listener.onAccept((Transport)socket);
return socket;
}
}

View File

@ -65,7 +65,7 @@ public class WSTransportServer extends WebTransportServerSupport {
}
}
holder.setServlet(new StompServlet());
holder.setServlet(new WSServlet());
contextHandler.addServlet(holder, "/");
contextHandler.setAttribute("acceptListener", getAcceptListener());

View File

@ -79,7 +79,7 @@ import org.fusesource.mqtt.codec.UNSUBSCRIBE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class MQTTProtocolConverter {
public class MQTTProtocolConverter {
private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class);

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* {
margin: 0;
padding: 0;
}
body {
font-family: 'Helvetica Neue', Helvetica, Verdana, Arial, sans-serif;
padding: 10px;
}
#disconnect {
display: none;
}
#subscribe {
display: none;
}
#debug {
background-color: #F0F0F0;
font-size: 12px;
width: 800px;
margin-left: 0px;
margin-top: 10px;
margin-right: 0px;
padding: 10px;
border: 1px solid #CCC;
}
#send_form {
width: 800px;
bottom: 10px;
}
#send_form #send_form_input {
border: 1px solid #CCC;
font-size: 16px;
height: 20px;
padding: 10px;
width: 800px;
}
#send_form input[disabled] {
background-color: #EEE;
}
#messages {
bottom: 25px;
left: 0;
overflow: auto;
padding: 5px;
right: 0;
top: 2em;
z-index: -1;
}
.message {
width: 95%;
}
.timestamp {
font-size: 12px;
}
.me, .nick {
float: left;
width: 100px;
}
.me {
color: #F99;
}
.nick {
color: #99F;
}
.status {
background-color: #DDD;
}
form dt {
clear:both;
width:19%;
float:left;
text-align:right;
}
form dd {
float:left;
width:80%;
margin:0 0 0.5em 0.25em;
}
input {
width: 320px;
}

View File

@ -0,0 +1,91 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the 'License'); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an 'AS IS' BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
$(document).ready(function(){
var client, destination;
$('#connect_form').submit(function() {
var host = $("#connect_host").val();
var port = $("#connect_port").val();
var clientId = $("#connect_clientId").val();
destination = $("#destination").val();
client = new Messaging.Client(host, Number(port), clientId);
client.onConnect = onConnect;
client.onMessageArrived = onMessageArrived;
client.onConnectionLost = onConnectionLost;
client.connect({onSuccess:onConnect, onFailure:onFailure});
return false;
});
// the client is notified when it is connected to the server.
var onConnect = function(frame) {
debug("connected to MQTT");
$('#connect').fadeOut({ duration: 'fast' });
$('#disconnect').fadeIn();
$('#send_form_input').removeAttr('disabled');
client.subscribe(destination);
};
// this allows to display debug logs directly on the web page
var debug = function(str) {
$("#debug").append(document.createTextNode(str + "\n"));
};
$('#disconnect_form').submit(function() {
client.disconnect();
$('#disconnect').fadeOut({ duration: 'fast' });
$('#connect').fadeIn();
$('#send_form_input').addAttr('disabled');
return false;
});
$('#send_form').submit(function() {
var text = $('#send_form_input').val();
if (text) {
message = new Messaging.Message(text);
message.destinationName = destination;
client.send(message);
$('#send_form_input').val("");
}
return false;
});
function onFailure(failure) {
debug("failure");
debug(failure.errorMessage);
}
function onMessageArrived(message) {
var p = document.createElement("p");
var t = document.createTextNode(message.payloadString);
p.appendChild(t);
$("#messages").append(p);
}
function onConnectionLost(responseObject) {
if (responseObject.errorCode !== 0) {
debug(client.clientId + ": " + responseObject.errorCode + "\n");
}
}
});

View File

@ -0,0 +1,174 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=iso-8859-1" />
<link rel="stylesheet" href="chat.css" type="text/css">
<style type="text/css" media="screen">
@import url(../styles/sorttable.css);
@import url(../styles/type-settings.css);
@import url(../styles/site.css);
@import url(../styles/prettify.css);
</style>
<title>Chat Example Using Stomp Over WebSocket</title>
<link rel="stylesheet" type="text/css" href="chat.css"></link>
<script type="text/javascript" src="../js/jquery-1.4.2.min.js"></script>
<script src='mqttws31.js'></script>
<script src='chat.js'></script>
<script>
$(document).ready(function() {
var supported = ("WebSocket" in window);
if(!supported) {
var msg = "Your browser does not support Web Sockets. This example will not work properly.<br>";
msg += "Please use a Web Browser with Web Sockets support (WebKit or Google Chrome).";
$("#connect").html(msg);
}
});
</script>
</head>
<body>
<div class="white_box">
<div class="header">
<div class="header_l">
<div class="header_r">
</div>
</div>
</div>
<div class="content">
<div class="content_l">
<div class="content_r">
<div>
<!-- Banner -->
<div id="asf_logo">
<div id="activemq_logo">
<a style="float:left; width:280px;display:block;text-indent:-5000px;text-decoration:none;line-height:60px; margin-top:10px; margin-left:100px;"
href="http://activemq.apache.org/"
title="The most popular and powerful open source Message Broker">ActiveMQ</a>
<a style="float:right; width:210px;display:block;text-indent:-5000px;text-decoration:none;line-height:60px; margin-top:15px; margin-right:10px;"
href="http://www.apache.org/" title="The Apache Software Foundation">ASF</a>
</div>
</div>
<div class="top_red_bar">
<div id="site-breadcrumbs">
<a href="../index.html" title="Home">Home</a>
</div>
<div id="site-quicklinks"><P>
<a href="http://activemq.apache.org/support.html"
title="Get help and support using Apache ActiveMQ">Support</a></p>
</div>
</div>
<table border="0">
<tbody>
<tr>
<td valign="top" width="100%" style="overflow:hidden;">
<div class="body-content">
<div id='connect'>
<form id='connect_form'>
<dl>
<dt><label for=connect_url>Server Host</label></dt>
<dd><input name=url id='connect_host' value='localhost'></dd>
<dt><label for=connect_port>Server Port</label></dt>
<dd><input id='connect_port' placeholder="Server Port" value="61614"></dd>
<dt><label for=connect_clientId>Client ID</label></dt>
<dd><input id='connect_clientId' placeholder="Client ID" value="clientId"></dd>
<dt><label for=destination>Destination</label></dt>
<dd><input id='destination' placeholder="Destination" value="/test"></dd>
<dt>&nbsp;</dt>
<dd><input type=submit id='connect_submit' value="Connect"></dd>
</dl>
</form>
<p><b>Make sure you have enabled <a href="http://activemq.apache.org/websockets.html">websockets transport</a> before running this demo</b></p>
<p>This is adapted <a href="https://github.com/jmesnil/stomp-websocket">stomp-websocket</a> library demo</p>
<p>Use the form above to connect to the Stomp server and subscribe to the destination.</p>
<p>Once connected, you can send messages to the destination with the text field at the bottom of this page</p>
</div>
<div id="disconnect">
<form id='disconnect_form'>
<input type=submit id='disconnect_submit' value="Disconnect">
</form>
</div>
<div id="messages">
</div>
<form id='send_form'>
<input id='send_form_input' placeholder="Type your message here" disabled />
</form>
<pre id="debug"></pre>
</div>
</td>
<td valign="top">
<div class="navigation">
<div class="navigation_top">
<div class="navigation_bottom">
<H3>Useful Links</H3>
<ul class="alternate" type="square">
<li><a href="http://activemq.apache.org/"
title="The most popular and powerful open source Message Broker">Documentation</a></li>
<li><a href="http://activemq.apache.org/faq.html">FAQ</a></li>
<li><a href="http://activemq.apache.org/download.html">Downloads</a>
</li>
<li><a href="http://activemq.apache.org/discussion-forums.html">Forums</a>
</li>
</ul>
</div>
</div>
</div>
</td>
</tr>
</tbody>
</table>
<div class="bottom_red_bar"></div>
</div>
</div>
</div>
</div>
<div class="black_box">
<div class="footer">
<div class="footer_l">
<div class="footer_r">
<div>
Copyright 2005-2012 The Apache Software Foundation.
(<a href="?printable=true">printable version</a>)
</div>
</div>
</div>
</div>
</div>
</div>
<div class="design_attribution"><a href="http://hiramchirino.com/">Graphic Design By Hiram</a></div>
</body>
</html>

File diff suppressed because it is too large Load Diff

View File

@ -39,7 +39,10 @@ $(document).ready(function(){
$('#send_form_input').removeAttr('disabled');
client.subscribe(destination, function(message) {
$("#messages").append(document.createTextNode("<p>" + message.body + "</p>\n"));
var p = document.createElement("p");
var t = document.createTextNode(message.body);
p.appendChild(t);
$("#messages").append(p);
});
};
client.connect(login, passcode, onconnect);