https://issues.apache.org/activemq/browse/AMQ-2728 - first stab at refactoring Ajax API to conform Jetty 7 continuations

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@958932 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-06-29 11:34:35 +00:00
parent ff43b7a9aa
commit b73b8bc5b7
6 changed files with 203 additions and 58 deletions

View File

@ -0,0 +1,36 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
#
# The logging properties used during tests..
#
log4j.rootLogger=INFO, out, stdout
log4j.logger.org.apache.activemq.spring=WARN
log4j.logger.org.apache.activemq.web=DEBUG
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
# File appender
log4j.appender.out=org.apache.log4j.FileAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
log4j.appender.out.file=target/test-reports/activemq-test.log
log4j.appender.out.append=true

View File

@ -96,10 +96,6 @@ org.activemq.Chat = function() {
return keyc;
};
// Again, you would generally use your particular js library to attach
// event handlers. However, I wanted to remove the dependency on the
// behaviors.js file in the original code, and I am demonstrating a library
// that can work with a variety of js libraries, so we are going old-school.
var addEvent = function(obj, type, fn) {
if (obj.addEventListener)
obj.addEventListener(type, fn, false);

View File

@ -0,0 +1,55 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
<title>Consume</title>
<script type="text/javascript" src="../js/jquery-1.4.2.min.js"></script>
<script type="text/javascript" src="../js/amq_jquery_adapter.js"></script>
<script type="text/javascript" src="../js/amq.js"></script>
<script>
var amq = org.activemq.Amq;
amq.init({ uri: '../amq', logging: true, timeout: 45 });
amq.sendMessage("queue://test", "passed");
setTimeout(function() {
var testHandler = function(message) {
document.getElementById("received").innerHTML = message.textContent;
amq.removeListener("test", "queue://test");
}
amq.addListener("test", "queue://test", testHandler);
}, (3 * 1000));
</script>
</head>
<body>
<div id="received">
failed
</div>
</body>
</html>

View File

@ -0,0 +1,51 @@
<html>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<title>Consume</title>
<script type="text/javascript" src="../js/jquery-1.4.2.min.js"></script>
<script type="text/javascript" src="../js/amq_jquery_adapter.js"></script>
<script type="text/javascript" src="../js/amq.js"></script>
<script>
var amq = org.activemq.Amq;
amq.init({ uri: '../amq', logging: true, timeout: 45 });
var testHandler = function(message) {
document.getElementById("received").innerHTML = message.textContent;
amq.removeListener("test", "queue://test");
}
amq.addListener("test", "queue://test", testHandler);
amq.sendMessage("queue://test", "passed");
</script>
</head>
<body>
<div id="received">
failed
</div>
</body>
</html>

View File

@ -263,46 +263,59 @@ public class MessageListenerServlet extends MessageServletSupport {
LOG.debug("doMessage timeout=" + timeout);
}
Continuation continuation = ContinuationSupport.getContinuation(request);
Listener listener = getListener(request);
if (listener != null && continuation != null && !continuation.isInitial()) {
listener.access();
}
Message message = null;
message = (Message)request.getAttribute("message");
synchronized (client) {
List consumers = client.getConsumers();
MessageAvailableConsumer consumer = null;
MessageAvailableConsumer consumer = (MessageAvailableConsumer)request.getAttribute("consumer");
// Look for a message that is ready to go
for (int i = 0; message == null && i < consumers.size(); i++) {
consumer = (MessageAvailableConsumer)consumers.get(i);
if (consumer.getAvailableListener() == null) {
continue;
}
// Look for any available messages
message = consumer.receiveNoWait();
if (LOG.isDebugEnabled()) {
LOG.debug("received " + message + " from " + consumer);
if (message == null) {
// Look for a message that is ready to go
for (int i = 0; message == null && i < consumers.size(); i++) {
consumer = (MessageAvailableConsumer)consumers.get(i);
if (consumer.getAvailableListener() == null) {
continue;
}
// Look for any available messages
message = consumer.receive(10);
if (LOG.isDebugEnabled()) {
LOG.debug("received " + message + " from " + consumer);
}
}
}
// Get an existing Continuation or create a new one if there are no
// messages
if (message == null) {
Continuation continuation = ContinuationSupport.getContinuation(request);
if (continuation.isExpired()) {
response.setStatus(HttpServletResponse.SC_OK);
StringWriter swriter = new StringWriter();
PrintWriter writer = new PrintWriter(swriter);
writer.println("<ajax-response>");
writer.print("</ajax-response>");
if (message == null && continuation.isInitial()) {
// register this continuation with our listener.
listener.setContinuation(continuation);
writer.flush();
String m = swriter.toString();
response.getWriter().println(m);
return;
}
// Get the continuation object (may wait and/or retry
// request here).
continuation.setTimeout(timeout);
continuation.suspend();
// Fetch the listeners
Listener listener = getListener(request);
// register this continuation with our listener.
listener.setContinuation(continuation);
return;
}
// prepare the responds
response.setContentType("text/xml");
response.setHeader("Cache-Control", "no-cache");
@ -319,15 +332,8 @@ public class MessageListenerServlet extends MessageServletSupport {
if (message != null) {
String id = consumerIdMap.get(consumer);
String destinationName = consumerDestinationNameMap.get(consumer);
writer.print("<response id='");
writer.print(id);
writer.print("'");
if (destinationName != null) {
writer.print(" destination='" + destinationName + "' ");
}
writer.print(">");
writeMessageResponse(writer, message);
writer.println("</response>");
writeMessageResponse(writer, message, id, destinationName);
messages++;
}
@ -347,22 +353,10 @@ public class MessageListenerServlet extends MessageServletSupport {
messages++;
String id = consumerIdMap.get(consumer);
String destinationName = consumerDestinationNameMap.get(consumer);
writer.print("<response id='");
writer.print(id);
writer.print("'");
if (destinationName != null) {
writer.print(" destination='" + destinationName + "' ");
}
writer.print(">");
writeMessageResponse(writer, message);
writer.println("</response>");
writeMessageResponse(writer, message, id, destinationName);
}
}
// Add poll message
// writer.println("<response type='object'
// id='amqPoll'><ok/></response>");
writer.print("</ajax-response>");
writer.flush();
@ -372,7 +366,14 @@ public class MessageListenerServlet extends MessageServletSupport {
}
protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException, IOException {
protected void writeMessageResponse(PrintWriter writer, Message message, String id, String destinationName) throws JMSException, IOException {
writer.print("<response id='");
writer.print(id);
writer.print("'");
if (destinationName != null) {
writer.print(" destination='" + destinationName + "' ");
}
writer.print(">");
if (message instanceof TextMessage) {
TextMessage textMsg = (TextMessage)message;
String txt = textMsg.getText();
@ -385,6 +386,7 @@ public class MessageListenerServlet extends MessageServletSupport {
Object object = objectMsg.getObject();
writer.print(object.toString());
}
writer.println("</response>");
}
protected Listener getListener(HttpServletRequest request) {
@ -464,7 +466,14 @@ public class MessageListenerServlet extends MessageServletSupport {
LOG.debug("message for " + consumer + "continuation=" + continuation);
}
if (continuation != null) {
continuation.resume();
try {
Message message = consumer.receive(10);
continuation.setAttribute("message", message);
continuation.setAttribute("consumer", consumer);
} catch (Exception e) {
LOG.error("Error receiving message " + e, e);
}
continuation.resume();
} else if (System.currentTimeMillis() - lastAccess > 2 * maximumReadTimeout) {
new Thread() {
public void run() {
@ -472,7 +481,6 @@ public class MessageListenerServlet extends MessageServletSupport {
};
}.start();
}
continuation = null;
}
}

View File

@ -300,13 +300,12 @@ public abstract class MessageServletSupport extends HttpServlet {
boolean isTopic = defaultTopicFlag;
if (destinationName.startsWith("topic://")) {
isTopic = true;
destinationName = destinationName.substring(8);
} else if (destinationName.startsWith("channel://")) {
} else if (destinationName.startsWith("channel://") || destinationName.startsWith("queue://")) {
isTopic = false;
destinationName = destinationName.substring(10);
} else {
isTopic = isTopic(request);
}
destinationName = destinationName.substring(destinationName.indexOf("://") + 3);
if (destinationOptions != null) {
destinationName += "?" + destinationOptions;