git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@997779 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-09-16 15:05:32 +00:00
parent f9379664f7
commit bc2a7f1ae0
8 changed files with 1478 additions and 70 deletions

View File

@ -146,19 +146,21 @@ org.activemq.Amq = function() {
adapter.ajax(uri, options); adapter.ajax(uri, options);
}; };
var sendJmsMessage = function(destination, message, type) { var sendJmsMessage = function(destination, message, type, headers) {
// Add message to outbound queue var message = {
if (batchInProgress) {
messageQueue[messageQueue.length] = {
destination: destination, destination: destination,
message: message, message: message,
messageType: type messageType: type
}; };
// Add message to outbound queue
if (batchInProgress) {
messageQueue[messageQueue.length] = {message:message, headers:headers};
} else { } else {
org.activemq.Amq.startBatch(); org.activemq.Amq.startBatch();
adapter.ajax(uri, { method: 'post', adapter.ajax(uri, { method: 'post',
data: 'destination=' + destination + '&message=' + message + '&type=' + type, data: buildParams( [message] ),
error: errorHandler, error: errorHandler,
headers: headers,
success: org.activemq.Amq.endBatch}); success: org.activemq.Amq.endBatch});
} }
}; };
@ -197,11 +199,30 @@ org.activemq.Amq = function() {
endBatch : function() { endBatch : function() {
if (messageQueue.length > 0) { if (messageQueue.length > 0) {
var body = buildParams(messageQueue); var messagesToSend = [];
messageQueue.length = 0; var messagesToQueue = [];
var outgoingHeaders = null;
// we need to ensure that messages which set headers are sent by themselves.
// if 2 'listen' messages were sent together, and a 'selector' header were added to one of them,
// AMQ would add the selector to both 'listen' commands.
for(i=0;i<messageQueue.length;i++) {
// a message with headers should always be sent by itself. if other messages have been added, send this one later.
if ( messageQueue[ i ].headers && messagesToSend.length == 0 ) {
messagesToSend[ messagesToSend.length ] = messageQueue[ i ].message;
outgoingHeaders = messageQueue[ i ].headers;
} else if ( ! messageQueue[ i ].headers && ! outgoingHeaders ) {
messagesToSend[ messagesToSend.length ] = messageQueue[ i ].message;
} else {
messagesToQueue[ messagesToQueue.length ] = messageQueue[ i ];
}
}
var body = buildParams(messagesToSend);
messageQueue = messagesToQueue;
org.activemq.Amq.startBatch(); org.activemq.Amq.startBatch();
adapter.ajax(uri, { adapter.ajax(uri, {
method: 'post', method: 'post',
headers: outgoingHeaders,
data: body, data: body,
success: org.activemq.Amq.endBatch, success: org.activemq.Amq.endBatch,
error: errorHandler}); error: errorHandler});
@ -218,15 +239,30 @@ org.activemq.Amq = function() {
// Listen on a channel or topic. // Listen on a channel or topic.
// handler must be a function taking a message argument // handler must be a function taking a message argument
addListener : function(id, destination, handler) { //
// Supported options:
// selector: If supplied, it should be a SQL92 string like "property-name='value'"
// http://activemq.apache.org/selectors.html
//
// Example: addListener( 'handler', 'topic://test-topic', function(msg) { return msg; }, { selector: "property-name='property-value'" } )
addListener : function(id, destination, handler, options) {
messageHandlers[id] = handler; messageHandlers[id] = handler;
sendJmsMessage(destination, id, 'listen'); var headers = options && options.selector ? {selector:options.selector} : null;
sendJmsMessage(destination, id, 'listen', headers);
}, },
// remove Listener from channel or topic. // remove Listener from channel or topic.
removeListener : function(id, destination) { removeListener : function(id, destination) {
messageHandlers[id] = null; messageHandlers[id] = null;
sendJmsMessage(destination, id, 'unlisten'); sendJmsMessage(destination, id, 'unlisten');
},
// for unit testing
getMessageQueue: function() {
return messageQueue;
},
testPollHandler: function( data ) {
return pollHandler( data );
} }
}; };
}(); }();

View File

@ -45,6 +45,7 @@ org.activemq.AmqAdapter = {
* - xhr: The XmlHttpRequest object. * - xhr: The XmlHttpRequest object.
* - status: A text string of the status. * - status: A text string of the status.
* - ex: The exception that caused the error. * - ex: The exception that caused the error.
* - headers: An object containing additional headers for the ajax request.
*/ */
ajax: function(uri, options) { ajax: function(uri, options) {
if (options.method == 'post') { if (options.method == 'post') {
@ -52,6 +53,7 @@ org.activemq.AmqAdapter = {
url: uri, url: uri,
handleAs: "xml", handleAs: "xml",
postData: options.data, postData: options.data,
headers: options.headers,
load : options.success ? options.success : function() {}, load : options.success ? options.success : function() {},
error: options.error ? function(ex, ioargs) { error: options.error ? function(ex, ioargs) {
options.error(ioargs.xhr,ioargs.xhr.status, ex); options.error(ioargs.xhr,ioargs.xhr.status, ex);
@ -66,6 +68,7 @@ org.activemq.AmqAdapter = {
dojo.xhrGet({ dojo.xhrGet({
url: uri, url: uri,
handleAs: "xml", handleAs: "xml",
headers: options.headers,
load : options.success ? options.success : function() {}, load : options.success ? options.success : function() {},
error: options.error ? function(ex, ioargs) { error: options.error ? function(ex, ioargs) {
options.error(ioargs.xhr,ioargs.xhr.status, ex); options.error(ioargs.xhr,ioargs.xhr.status, ex);
@ -77,4 +80,5 @@ org.activemq.AmqAdapter = {
log: function(message, exception) { log: function(message, exception) {
if (typeof console != 'undefined' && console.log) console.log(message); if (typeof console != 'undefined' && console.log) console.log(message);
} }
}; };

View File

@ -45,33 +45,41 @@ org.activemq.AmqAdapter = {
* - xhr: The XmlHttpRequest object. * - xhr: The XmlHttpRequest object.
* - status: A text string of the status. * - status: A text string of the status.
* - ex: The exception that caused the error. * - ex: The exception that caused the error.
* - headers: An object containing additional headers for the ajax request.
*/ */
ajax: function(uri, options) { ajax: function(uri, options) {
if (options.method == 'post') { request = {
jQuery.ajax({
type: "POST",
url: uri, url: uri,
data: options.data, data: options.data,
success: options.success || function(){}, success: options.success || function(){},
error: options.error || function(){}, error: options.error || function(){}
beforeSend: function(xhr) { }
var headers = {};
if( options.headers ) {
headers = options.headers;
}
if (options.method == 'post') {
request.type = 'POST';
/* Force "Connection: close" for Mozilla browsers to work around /* Force "Connection: close" for Mozilla browsers to work around
* a bug where XMLHttpReqeuest sends an incorrect Content-length * a bug where XMLHttpReqeuest sends an incorrect Content-length
* header. See Mozilla Bugzilla #246651. * header. See Mozilla Bugzilla #246651.
*/ */
xhr.setRequestHeader("Connection", 'close'); headers[ 'Connection' ] = 'close';
}
});
} else { } else {
jQuery.ajax({ request.type = 'GET';
type: "GET", request.dataType = 'xml';
url: uri,
data: options.data,
success: options.success || function(){},
error: options.error || function(){},
dataType: 'xml'
});
} }
if( headers ) {
request.beforeSend = function(xhr) {
for( h in headers ) {
xhr.setRequestHeader( h, headers[ h ] );
}
}
}
jQuery.ajax( request );
}, },
log: function(message, exception) { log: function(message, exception) {

View File

@ -45,12 +45,10 @@ org.activemq.AmqAdapter = {
* - xhr: The XmlHttpRequest object. * - xhr: The XmlHttpRequest object.
* - status: A text string of the status. * - status: A text string of the status.
* - ex: The exception that caused the error. * - ex: The exception that caused the error.
* - headers: An object containing additional headers for the ajax request.
*/ */
ajax: function(uri, options) { ajax: function(uri, options) {
if (options.method == 'post') { request = {
new Ajax.Request(uri, {
method: "post",
postBody: options.data,
onSuccess: options.success ? function(xhr, header) { onSuccess: options.success ? function(xhr, header) {
if (options.success) { if (options.success) {
var ct = xhr.getResponseHeader("content-type"); var ct = xhr.getResponseHeader("content-type");
@ -63,28 +61,24 @@ org.activemq.AmqAdapter = {
}, },
onException: options.error || function() { onException: options.error || function() {
} }
}); }
if( options.headers ) {
request.requestHeaders = options.headers;
}
if (options.method == 'post') {
request.postBody = options.data;
} else { } else {
new Ajax.Request(uri, { request.parameters = options.data;
method: "get", request.method = 'get';
parameters: options.data,
onSuccess: function(xhr, header) {
if (options.success) {
var ct = xhr.getResponseHeader("content-type");
var xml = ct && ct.indexOf("xml") >= 0;
var data = xml ? xhr.responseXML : xhr.responseText;
options.success(data);
}
},
onFailure: options.error || function() {
},
onException: options.error || function() {
}
});
} }
new Ajax.Request( uri, request );
}, },
log: function(message, exception) { log: function(message, exception) {
if (typeof console != 'undefined' && console.log) console.log(message); if (typeof console != 'undefined' && console.log) console.log(message);
} }
}; };

View File

@ -0,0 +1,291 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
<head>
<title>AMQ test</title>
<meta http-equiv="content-type" content="text/html; charset=utf-8" />
<script src="assets/jsunittest.js" type="text/javascript"></script>
<script>
var org = org || {};
org.activemq = org.activemq || {};
org.activemq.AmqAdapter = {
// implement org.activemq.AmqAdapter API
init: function(options) {},
ajax: function(uri, options) {
ajaxRequests[ajaxRequests.length] = { uri:uri, options:options };
},
// add additional functionality for testing.
ajaxRequests: [],
getRequests: function() {
return ajaxRequests;
},
reset: function() {
ajaxRequests=[];
}
};
</script>
<script src="../js/amq.js" type="text/javascript"></script>
<link rel="stylesheet" href="assets/unittest.css" type="text/css" />
</head>
<body>
<div id="content">
<div id="header">
<h1>AMQ tests</h1>
<p>
This file tests amq.js.
</p>
</div>
<!-- Log output (one per Runner, via {testLog: "testlog"} option)-->
<div id="testlog"></div>
<!-- Put sample/test html here -->
<div id="sample">
</div>
</div>
<script type="text/javascript">
function createXmlFromString( xmlString ) {
// http://developer.taboca.com/cases/en/client-javascript-dom-parser/
// Mozilla and Netscape browsers
if (document.implementation.createDocument) {
var parser = new DOMParser()
response = parser.parseFromString( xmlString, "text/xml")
// MSIE
} else if (window.ActiveXObject) {
response = new ActiveXObject("Microsoft.XMLDOM")
response.async="false"
response.loadXML( xmlString )
}
return response;
}
// <![CDATA[
new Test.Unit.Runner({
setup: function() {
org.activemq.AmqAdapter.reset();
org.activemq.Amq.init({ uri: '../amq', timeout: 30 });
},
teardown: function() {
org.activemq.Amq.endBatch();
},
testMessagesAreSentToUrlDefinedInInit: function() { with( this ) {
org.activemq.Amq.sendMessage( 'queue://test', '<message>test</message>' );
var requests = org.activemq.AmqAdapter.getRequests();
assertEqual( 2, requests.length );
assertEqual( '../amq', requests[ 0 ].uri );
assertEqual( '../amq', requests[ 1 ].uri );
}},
testFirstMessageIsAPoll: function() { with( this ) {
org.activemq.Amq.sendMessage( 'queue://test', '<message>test</message>' );
var requests = org.activemq.AmqAdapter.getRequests();
assertEqual( 'get', requests[ 0 ].options.method );
assert( requests[ 0 ].options.data.match( /timeout=30000&d=\d+&r=[\d.]+/ ) );
}},
testPostIsSentIfNoBatchIsInProgress: function() { with( this ) {
org.activemq.Amq.sendMessage( 'queue://test', '<message>test</message>' );
var requests = org.activemq.AmqAdapter.getRequests();
assertEqual( 2, requests.length );
assertEqual( 'post', requests[ 1 ].options.method );
assertEqual( 'destination=queue://test&message=<message>test</message>&type=send', requests[ 1 ].options.data );
}},
testMessagesAreDeliveredInABatchIfAjaxRequestIsInProgressWhenSendMessageIsCalled: function() { with( this ) {
// use startBatch to indicate a previous message POST is currently in progress.
org.activemq.Amq.startBatch();
org.activemq.Amq.sendMessage( 'queue://test', '<message>test</message>' );
org.activemq.Amq.sendMessage( 'queue://test2', '<message>test2</message>' );
// endBatch is the callback once the previous POST finishes. Triggers delivery of queued messages.
org.activemq.Amq.endBatch();
var requests = org.activemq.AmqAdapter.getRequests();
assertEqual( 2, requests.length );
assertEqual( 'post', requests[1].options.method );
assertEqual( "destination=queue://test&message=<message>test</message>&type=send&d1=queue://test2&m1=<message>test2</message>&t1=send", requests[1].options.data );
}},
testAddListenerSendsListenMessage: function() { with( this ) {
org.activemq.Amq.addListener( 'client_id', 'topic://test', function(){} );
var requests = org.activemq.AmqAdapter.getRequests();
assertEqual( 2, requests.length );
assertEqual( 'post', requests[1].options.method );
assertEqual( "destination=topic://test&message=client_id&type=listen", requests[1].options.data );
}},
testAddListenerMayIncludeASelector: function() { with( this ) {
org.activemq.Amq.addListener( 'client_id', 'topic://test', function(){}, {selector:"identifier='ALPHA'"} );
var requests = org.activemq.AmqAdapter.getRequests();
assertEqual( 2, requests.length );
assertEqual( 'post', requests[1].options.method );
assertEqual( "destination=topic://test&message=client_id&type=listen", requests[1].options.data );
assertHashEqual( { selector: "identifier='ALPHA'" }, requests[1].options.headers );
}},
testAllQueuedMessagesContainingHeadersAreDeliveredIndividuallyToPreventHeaderConflicts: function() { with( this ) {
org.activemq.Amq.startBatch();
org.activemq.Amq.addListener( 'client_id_1', 'topic://test1', function(){}, {selector:"identifier='ALPHA'"} );
org.activemq.Amq.addListener( 'client_id_2', 'topic://test2', function(){}, {selector:"identifier='BRAVO'"} );
// simulate 1st post returning, which triggers 2nd post.
org.activemq.Amq.endBatch();
// poll & first listen have been sent.
assertEqual( 2, org.activemq.AmqAdapter.getRequests().length );
// second listen is still in the queue.
var queued = org.activemq.Amq.getMessageQueue();
assertEqual( 1, queued.length );
assertHashEqual( { selector: "identifier='BRAVO'" }, queued[ 0 ].headers );
assertHashEqual( { destination: 'topic://test2', message: 'client_id_2', messageType: 'listen' }, queued[ 0 ].message );
// when first post returns, the second listen gets sent.
// this endBatch simulates that second post returning.
org.activemq.Amq.endBatch();
var requests = org.activemq.AmqAdapter.getRequests();
var queued = org.activemq.Amq.getMessageQueue();
assertEqual( 3, requests.length );
assertEqual( 'post', requests[1].options.method );
assertEqual( "destination=topic://test1&message=client_id_1&type=listen", requests[1].options.data );
assertHashEqual( { selector: "identifier='ALPHA'" }, requests[1].options.headers );
assertEqual( 'post', requests[2].options.method );
assertEqual( "destination=topic://test2&message=client_id_2&type=listen", requests[2].options.data );
assertHashEqual( { selector: "identifier='BRAVO'" }, requests[2].options.headers );
assertEqual( 0, queued.length );
}},
// Is this desired behavior? Message order changes from that specified by the client.
testQueuedMessagesWithoutHeadersAreDeliveredInASinglePost: function() { with( this ) {
org.activemq.Amq.startBatch();
org.activemq.Amq.sendMessage( 'queue://test', '<message>test</message>' );
org.activemq.Amq.addListener( 'client_id_1', 'topic://test1', function(){}, {selector:"identifier='ALPHA'"} );
org.activemq.Amq.sendMessage( 'queue://test', '<message>test</message>' );
org.activemq.Amq.addListener( 'client_id_2', 'topic://test2', function(){}, {selector:"identifier='BRAVO'"} );
org.activemq.Amq.sendMessage( 'queue://test', '<message>test</message>' );
// poll & all sendMessage calls go out first.
org.activemq.Amq.endBatch();
assertEqual( 2, org.activemq.AmqAdapter.getRequests().length );
assertEqual( 2, org.activemq.Amq.getMessageQueue().length );
// first listen goes next.
org.activemq.Amq.endBatch();
assertEqual( 3, org.activemq.AmqAdapter.getRequests().length );
assertEqual( 1, org.activemq.Amq.getMessageQueue().length );
// final listen goes out.
org.activemq.Amq.endBatch();
var requests = org.activemq.AmqAdapter.getRequests();
assertEqual( 4, requests.length );
assertEqual( 0, org.activemq.Amq.getMessageQueue().length );
assertEqual( "destination=queue://test&message=<message>test</message>&type=send&d1=queue://test&m1=<message>test</message>&t1=send&d2=queue://test&m2=<message>test</message>&t2=send", requests[ 1 ].options.data );
assertHashEqual( {}, requests[ 1 ].options.headers );
assertEqual( "destination=topic://test1&message=client_id_1&type=listen", requests[ 2 ].options.data );
assertEqual( "destination=topic://test2&message=client_id_2&type=listen", requests[ 3 ].options.data );
}},
testSelectorFromQueuedListenerIsNotAddedToLaterMessages: function() { with( this ) {
org.activemq.Amq.startBatch();
org.activemq.Amq.addListener( 'client_id_1', 'topic://test1', function(){}, {selector:"identifier='ALPHA'"} );
org.activemq.Amq.sendMessage( 'queue://test', '<message>test</message>' );
org.activemq.Amq.sendMessage( 'queue://test', '<message>test</message>' );
// poll & listener go out first.
org.activemq.Amq.endBatch();
assertEqual( 2, org.activemq.AmqAdapter.getRequests().length );
assertEqual( 2, org.activemq.Amq.getMessageQueue().length );
// 2 sendMessages go next.
org.activemq.Amq.endBatch();
var requests = org.activemq.AmqAdapter.getRequests();
assertEqual( 3, requests.length );
assertEqual( 0, org.activemq.Amq.getMessageQueue().length );
assertEqual( "destination=topic://test1&message=client_id_1&type=listen", requests[ 1 ].options.data );
assertHashEqual( { selector: "identifier='ALPHA'" }, requests[ 1 ].options.headers );
assertEqual( "destination=queue://test&message=<message>test</message>&type=send&d1=queue://test&m1=<message>test</message>&t1=send", requests[ 2 ].options.data );
assertHashEqual( {}, requests[ 2 ].options.headers );
}},
testAddListenerWithoutSelectorWillBeBatchedWithOtherMessages: function() { with( this ) {
org.activemq.Amq.startBatch();
org.activemq.Amq.sendMessage( 'queue://test', '<message>test</message>' );
org.activemq.Amq.addListener( 'client_id_1', 'topic://test1', function(){} );
org.activemq.Amq.sendMessage( 'queue://test', '<message>test</message>' );
org.activemq.Amq.addListener( 'client_id_2', 'topic://test2', function(){} );
org.activemq.Amq.sendMessage( 'queue://test', '<message>test</message>' );
org.activemq.Amq.endBatch();
var requests = org.activemq.AmqAdapter.getRequests();
assertEqual( 2, requests.length );
assertEqual( 0, org.activemq.Amq.getMessageQueue().length );
assertEqual( "destination=queue://test&message=<message>test</message>&type=send&d1=topic://test1&m1=client_id_1&t1=listen&d2=queue://test&m2=<message>test</message>&t2=send&d3=topic://test2&m3=client_id_2&t3=listen&d4=queue://test&m4=<message>test</message>&t4=send", requests[ 1 ].options.data );
}},
testRemoveListenerSendsUnlistenMessage: function() { with( this ) {
org.activemq.Amq.removeListener( 'client_id', 'topic://test' );
var requests = org.activemq.AmqAdapter.getRequests();
assertEqual( 2, requests.length );
assertEqual( 'post', requests[1].options.method );
assertEqual( "destination=topic://test&message=client_id&type=unlisten", requests[1].options.data );
}},
testAddListenerCallbackIsCalledForReceivedMessages: function() { with( this ) {
// build an XML document like the one which the ajax implementers would pass to pollHandler
response = createXmlFromString( '<ajax-response><response id="client_id" destination="queue://test" >test message</response></ajax-response>' );
// we'll expect the callback to set this value
var callbackValue;
org.activemq.Amq.addListener( 'client_id', 'queue://test', function( msg ) { callbackValue = msg; } );
org.activemq.Amq.testPollHandler( response );
assertEqual( 'test message', callbackValue.textContent );
}}
});
// ]]>
</script>
</body>
</html>

View File

@ -0,0 +1,4 @@
jsunittest.js from http://jsunittest.com/
This directory is for core jsunittest files only! Put your unit test .html files up one level.
You should be referencing files in public/javascripts/ in your test files.

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,54 @@
body, div, p, h1, h2, h3, ul, ol, span, a, table, td, form, img, li {
font-family: sans-serif;
}
body {
font-size:0.8em;
}
#log {
padding-bottom: 1em;
border-bottom: 2px solid #000;
margin-bottom: 2em;
}
.logsummary {
margin-top: 1em;
margin-bottom: 1em;
padding: 1ex;
border: 1px solid #000;
font-weight: bold;
}
.logtable {
width:100%;
border-collapse: collapse;
border: 1px dotted #666;
}
.logtable td, .logtable th {
text-align: left;
padding: 3px 8px;
border: 1px dotted #666;
}
.logtable .passed {
background-color: #cfc;
}
.logtable .failed, .logtable .error {
background-color: #fcc;
}
.logtable .warning {
background-color: #FC6;
}
.logtable td div.action_buttons {
display: inline;
}
.logtable td div.action_buttons input {
margin: 0 5px;
font-size: 10px;
}